Просмотр исходного кода

Merge pull request #12583 from zhongwencool/wait-for-listener-stop

chore: wait_for cowboy listener release sock when stop_listener
zhongwencool 1 год назад
Родитель
Сommit
21e0ecfcce
2 измененных файлов с 60 добавлено и 7 удалено
  1. 29 7
      apps/emqx/src/emqx_listeners.erl
  2. 31 0
      apps/emqx/test/emqx_listeners_SUITE.erl

+ 29 - 7
apps/emqx/src/emqx_listeners.erl

@@ -59,7 +59,7 @@
 -export([format_bind/1]).
 -export([format_bind/1]).
 
 
 -ifdef(TEST).
 -ifdef(TEST).
--export([certs_dir/2]).
+-export([certs_dir/2, wait_listener_stopped/1]).
 -endif.
 -endif.
 
 
 -export_type([listener_id/0]).
 -export_type([listener_id/0]).
@@ -355,7 +355,8 @@ do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) ->
 do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) ->
 do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) ->
     case cowboy:stop_listener(Id) of
     case cowboy:stop_listener(Id) of
         ok ->
         ok ->
-            wait_listener_stopped(ListenOn);
+            _ = wait_listener_stopped(ListenOn),
+            ok;
         Error ->
         Error ->
             Error
             Error
     end;
     end;
@@ -363,13 +364,25 @@ do_stop_listener(quic, Id, _Conf) ->
     quicer:terminate_listener(Id).
     quicer:terminate_listener(Id).
 
 
 wait_listener_stopped(ListenOn) ->
 wait_listener_stopped(ListenOn) ->
+    wait_listener_stopped(ListenOn, 0).
+
+wait_listener_stopped(ListenOn, 3) ->
+    Log = #{
+        msg => "port_not_released_after_listener_stopped",
+        explain => "Expecting the operating system to release the port soon.",
+        listener => ListenOn,
+        wait_seconds => 9
+    },
+    ?SLOG(warning, Log),
+    timeout;
+wait_listener_stopped(ListenOn, RetryCount) ->
     % NOTE
     % NOTE
     % `cowboy:stop_listener/1` will not close the listening socket explicitly,
     % `cowboy:stop_listener/1` will not close the listening socket explicitly,
     % it will be closed by the runtime system **only after** the process exits.
     % it will be closed by the runtime system **only after** the process exits.
     Endpoint = maps:from_list(ip_port(ListenOn)),
     Endpoint = maps:from_list(ip_port(ListenOn)),
     case
     case
         gen_tcp:connect(
         gen_tcp:connect(
-            maps:get(ip, Endpoint, loopback),
+            maps:get(ip, Endpoint, "127.0.0.1"),
             maps:get(port, Endpoint),
             maps:get(port, Endpoint),
             [{active, false}]
             [{active, false}]
         )
         )
@@ -380,10 +393,19 @@ wait_listener_stopped(ListenOn) ->
             %% but don't want to crash if not, because this doesn't make any difference.
             %% but don't want to crash if not, because this doesn't make any difference.
             ok;
             ok;
         {ok, Socket} ->
         {ok, Socket} ->
-            %% NOTE
-            %% Tiny chance to get a connected socket here, when some other process
-            %% concurrently binds to the same port.
-            gen_tcp:close(Socket)
+            %% cowboy(ws/wss) will close the socket if we don't send packet in 5 seconds.
+            %% so we only wait 3 second here.
+            case gen_tcp:recv(Socket, 0, 3000) of
+                {ok, _} ->
+                    _ = gen_tcp:close(Socket),
+                    wait_listener_stopped(ListenOn, RetryCount + 1);
+                {error, timeout} ->
+                    _ = gen_tcp:close(Socket),
+                    wait_listener_stopped(ListenOn, RetryCount + 1);
+                {error, _} ->
+                    _ = gen_tcp:close(Socket),
+                    ok
+            end
     end.
     end.
 
 
 -ifndef(TEST).
 -ifndef(TEST).

+ 31 - 0
apps/emqx/test/emqx_listeners_SUITE.erl

@@ -41,6 +41,7 @@ end_per_suite(Config) ->
 init_per_testcase(Case, Config) when
 init_per_testcase(Case, Config) when
     Case =:= t_start_stop_listeners;
     Case =:= t_start_stop_listeners;
     Case =:= t_restart_listeners;
     Case =:= t_restart_listeners;
+    Case =:= t_wait_for_stop_listeners;
     Case =:= t_restart_listeners_with_hibernate_after_disabled
     Case =:= t_restart_listeners_with_hibernate_after_disabled
 ->
 ->
     ok = emqx_listeners:stop(),
     ok = emqx_listeners:stop(),
@@ -57,6 +58,36 @@ t_start_stop_listeners(_) ->
     ?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})),
     ?assertException(error, _, emqx_listeners:start_listener(ws, {"127.0.0.1", 8083}, #{})),
     ok = emqx_listeners:stop().
     ok = emqx_listeners:stop().
 
 
+t_wait_for_stop_listeners(_) ->
+    ok = emqx_listeners:start(),
+    meck:new([cowboy], [passthrough, no_history, no_link]),
+    %% mock stop_listener return ok but listen port is still open
+    meck:expect(cowboy, stop_listener, fun(_) -> ok end),
+    List = [
+        {<<"ws:default">>, {"127.0.0.1", 8083}},
+        {<<"wss:default">>, {"127.0.0.1", 8084}}
+    ],
+    lists:foreach(
+        fun({Id, ListenerOn}) ->
+            Start = erlang:system_time(seconds),
+            ok = emqx_listeners:stop_listener(Id),
+            ?assertEqual(timeout, emqx_listeners:wait_listener_stopped(ListenerOn)),
+            End = erlang:system_time(seconds),
+            ?assert(End - Start >= 9, "wait_listener_stopped should wait at least 9 seconds")
+        end,
+        List
+    ),
+    meck:unload(cowboy),
+    lists:foreach(
+        fun({Id, ListenerOn}) ->
+            ok = emqx_listeners:stop_listener(Id),
+            ?assertEqual(ok, emqx_listeners:wait_listener_stopped(ListenerOn))
+        end,
+        List
+    ),
+    ok = emqx_listeners:stop(),
+    ok.
+
 t_restart_listeners(_) ->
 t_restart_listeners(_) ->
     ok = emqx_listeners:start(),
     ok = emqx_listeners:start(),
     ok = emqx_listeners:stop(),
     ok = emqx_listeners:stop(),