Browse Source

chore: use gen_tcp:recv timeout to wait socket closed

zhongwencool 1 year ago
parent
commit
660a1ce99d
2 changed files with 59 additions and 24 deletions
  1. 28 24
      apps/emqx/src/emqx_listeners.erl
  2. 31 0
      apps/emqx/test/emqx_listeners_SUITE.erl

+ 28 - 24
apps/emqx/src/emqx_listeners.erl

@@ -59,7 +59,7 @@
 -export([format_bind/1]).
 -export([format_bind/1]).
 
 
 -ifdef(TEST).
 -ifdef(TEST).
--export([certs_dir/2]).
+-export([certs_dir/2, wait_listener_stopped/1]).
 -endif.
 -endif.
 
 
 -export_type([listener_id/0]).
 -export_type([listener_id/0]).
@@ -355,21 +355,33 @@ do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) ->
 do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) ->
 do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) ->
     case cowboy:stop_listener(Id) of
     case cowboy:stop_listener(Id) of
         ok ->
         ok ->
-            wait_listener_stopped(ListenOn, 0);
+            _ = wait_listener_stopped(ListenOn),
+            ok;
         Error ->
         Error ->
             Error
             Error
     end;
     end;
 do_stop_listener(quic, Id, _Conf) ->
 do_stop_listener(quic, Id, _Conf) ->
     quicer:terminate_listener(Id).
     quicer:terminate_listener(Id).
 
 
-wait_listener_stopped(ListenOn, RetryMs) ->
+wait_listener_stopped(ListenOn) ->
+    wait_listener_stopped(ListenOn, 0).
+
+wait_listener_stopped(ListenOn, 3) ->
+    Log = #{
+        msg => "cowboy_listener_not_closed_when_stopping_listener",
+        listener => ListenOn,
+        wait_seconds => 9
+    },
+    ?SLOG(warning, Log),
+    timeout;
+wait_listener_stopped(ListenOn, RetryCount) ->
     % NOTE
     % NOTE
     % `cowboy:stop_listener/1` will not close the listening socket explicitly,
     % `cowboy:stop_listener/1` will not close the listening socket explicitly,
     % it will be closed by the runtime system **only after** the process exits.
     % it will be closed by the runtime system **only after** the process exits.
     Endpoint = maps:from_list(ip_port(ListenOn)),
     Endpoint = maps:from_list(ip_port(ListenOn)),
     case
     case
         gen_tcp:connect(
         gen_tcp:connect(
-            maps:get(ip, Endpoint, loopback),
+            maps:get(ip, Endpoint, "127.0.0.1"),
             maps:get(port, Endpoint),
             maps:get(port, Endpoint),
             [{active, false}]
             [{active, false}]
         )
         )
@@ -380,26 +392,18 @@ wait_listener_stopped(ListenOn, RetryMs) ->
             %% but don't want to crash if not, because this doesn't make any difference.
             %% but don't want to crash if not, because this doesn't make any difference.
             ok;
             ok;
         {ok, Socket} ->
         {ok, Socket} ->
-            %% NOTE
-            %% Tiny chance to get a connected socket here, when some other process
-            %% concurrently binds to the same port.
-            gen_tcp:close(Socket),
-            Log = #{
-                msg => "cowboy_listener_still_open_after_stopping_listener",
-                listener => ListenOn,
-                wait_ms => RetryMs
-            },
-            case RetryMs >= 3000 of
-                true ->
-                    %% Don't stop this listener, in case other processes start it again.
-                    ?SLOG(warning, Log),
-                    ok;
-                false ->
-                    ?SLOG(info, Log),
-                    Interval = 300,
-                    NewRetryMs = RetryMs + Interval,
-                    timer:sleep(Interval),
-                    wait_listener_stopped(ListenOn, NewRetryMs)
+            %% cowboy(ws/wss) will close the socket if we don't send packet in 5 seconds.
+            %% so we only wait 3 second here.
+            case gen_tcp:recv(Socket, 0, 3000) of
+                {ok, _} ->
+                    _ = gen_tcp:close(Socket),
+                    wait_listener_stopped(ListenOn, RetryCount + 1);
+                {error, timeout} ->
+                    _ = gen_tcp:close(Socket),
+                    wait_listener_stopped(ListenOn, RetryCount + 1);
+                {error, _} ->
+                    _ = gen_tcp:close(Socket),
+                    ok
             end
             end
     end.
     end.
 
 

+ 31 - 0
apps/emqx/test/emqx_listeners_SUITE.erl

@@ -41,6 +41,7 @@ end_per_suite(Config) ->
 init_per_testcase(Case, Config) when
 init_per_testcase(Case, Config) when
     Case =:= t_start_stop_listeners;
     Case =:= t_start_stop_listeners;
     Case =:= t_restart_listeners;
     Case =:= t_restart_listeners;
+    Case =:= t_wait_for_stop_listeners;
     Case =:= t_restart_listeners_with_hibernate_after_disabled
     Case =:= t_restart_listeners_with_hibernate_after_disabled
 ->
 ->
     ok = emqx_listeners:stop(),
     ok = emqx_listeners:stop(),
@@ -57,6 +58,36 @@ t_start_stop_listeners(_) ->
     ?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})),
     ?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})),
     ok = emqx_listeners:stop().
     ok = emqx_listeners:stop().
 
 
+t_wait_for_stop_listeners(_) ->
+    ok = emqx_listeners:start(),
+    meck:new([cowboy], [passthrough, no_history, no_link]),
+    %% mock stop_listener return ok but listen port is still open
+    meck:expect(cowboy, stop_listener, fun(_) -> ok end),
+    List = [
+        {<<"ws:default">>, {"127.0.0.1", 8083}},
+        {<<"wss:default">>, {"127.0.0.1", 8084}}
+    ],
+    lists:foreach(
+        fun({Id, ListenerOn}) ->
+            Start = erlang:system_time(seconds),
+            ok = emqx_listeners:stop_listener(Id),
+            ?assertEqual(timeout, emqx_listeners:wait_listener_stopped(ListenerOn)),
+            End = erlang:system_time(seconds),
+            ?assert(End - Start >= 9, "wait_listener_stopped should wait at least 9 seconds")
+        end,
+        List
+    ),
+    meck:unload(cowboy),
+    lists:foreach(
+        fun({Id, ListenerOn}) ->
+            ok = emqx_listeners:stop_listener(Id),
+            ?assertEqual(ok, emqx_listeners:wait_listener_stopped(ListenerOn))
+        end,
+        List
+    ),
+    ok = emqx_listeners:stop(),
+    ok.
+
 t_restart_listeners(_) ->
 t_restart_listeners(_) ->
     ok = emqx_listeners:start(),
     ok = emqx_listeners:start(),
     ok = emqx_listeners:stop(),
     ok = emqx_listeners:stop(),