Procházet zdrojové kódy

fix(listen): wait until port is free when stopping ranch listeners

Simple `cowboy:stop_listener/1` will not close the listening socket
explicitly, it was the source of test flaps before this fix.
Andrew Mayorov před 2 roky
rodič
revize
23542d1262
1 změnil soubory, kde provedl 39 přidání a 19 odebrání
  1. 39 19
      apps/emqx/src/emqx_listeners.erl

+ 39 - 19
apps/emqx/src/emqx_listeners.erl

@@ -277,9 +277,8 @@ restart_listener(Type, ListenerName, Conf) ->
     restart_listener(Type, ListenerName, Conf, Conf).
     restart_listener(Type, ListenerName, Conf, Conf).
 
 
 restart_listener(Type, ListenerName, OldConf, NewConf) ->
 restart_listener(Type, ListenerName, OldConf, NewConf) ->
-    case do_stop_listener(Type, ListenerName, OldConf) of
+    case stop_listener(Type, ListenerName, OldConf) of
         ok -> start_listener(Type, ListenerName, NewConf);
         ok -> start_listener(Type, ListenerName, NewConf);
-        {error, not_found} -> start_listener(Type, ListenerName, NewConf);
         {error, Reason} -> {error, Reason}
         {error, Reason} -> {error, Reason}
     end.
     end.
 
 
@@ -296,42 +295,63 @@ stop_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun stop_listener/3).
     apply_on_listener(ListenerId, fun stop_listener/3).
 
 
 stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
 stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
-    case do_stop_listener(Type, ListenerName, Conf) of
+    Id = listener_id(Type, ListenerName),
+    ok = del_limiter_bucket(Id, Conf),
+    case do_stop_listener(Type, Id, Conf) of
         ok ->
         ok ->
             console_print(
             console_print(
                 "Listener ~ts on ~ts stopped.~n",
                 "Listener ~ts on ~ts stopped.~n",
-                [listener_id(Type, ListenerName), format_bind(Bind)]
+                [Id, format_bind(Bind)]
             ),
             ),
             ok;
             ok;
         {error, not_found} ->
         {error, not_found} ->
-            ?ELOG(
-                "Failed to stop listener ~ts on ~ts: ~0p~n",
-                [listener_id(Type, ListenerName), format_bind(Bind), already_stopped]
-            ),
             ok;
             ok;
         {error, Reason} ->
         {error, Reason} ->
             ?ELOG(
             ?ELOG(
                 "Failed to stop listener ~ts on ~ts: ~0p~n",
                 "Failed to stop listener ~ts on ~ts: ~0p~n",
-                [listener_id(Type, ListenerName), format_bind(Bind), Reason]
+                [Id, format_bind(Bind), Reason]
             ),
             ),
             {error, Reason}
             {error, Reason}
     end.
     end.
 
 
 -spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
 -spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
 
 
-do_stop_listener(Type, ListenerName, #{bind := ListenOn} = Conf) when Type == tcp; Type == ssl ->
-    Id = listener_id(Type, ListenerName),
-    del_limiter_bucket(Id, Conf),
+do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl ->
     esockd:close(Id, ListenOn);
     esockd:close(Id, ListenOn);
-do_stop_listener(Type, ListenerName, Conf) when Type == ws; Type == wss ->
-    Id = listener_id(Type, ListenerName),
-    del_limiter_bucket(Id, Conf),
-    cowboy:stop_listener(Id);
-do_stop_listener(quic, ListenerName, Conf) ->
-    Id = listener_id(quic, ListenerName),
-    del_limiter_bucket(Id, Conf),
+do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss ->
+    case cowboy:stop_listener(Id) of
+        ok ->
+            wait_listener_stopped(ListenOn);
+        Error ->
+            Error
+    end;
+do_stop_listener(quic, Id, _Conf) ->
     quicer:stop_listener(Id).
     quicer:stop_listener(Id).
 
 
+wait_listener_stopped(ListenOn) ->
+    % NOTE
+    % `cowboy:stop_listener/1` will not close the listening socket explicitly,
+    % it will be closed by the runtime system **only after** the process exits.
+    Endpoint = maps:from_list(ip_port(ListenOn)),
+    case
+        gen_tcp:connect(
+            maps:get(ip, Endpoint, loopback),
+            maps:get(port, Endpoint),
+            [{active, false}]
+        )
+    of
+        {error, _EConnrefused} ->
+            %% NOTE
+            %% We should get `econnrefused` here because acceptors are already dead
+            %% but don't want to crash if not, because this doesn't make any difference.
+            ok;
+        {ok, Socket} ->
+            %% NOTE
+            %% Tiny chance to get a connected socket here, when some other process
+            %% concurrently binds to the same port.
+            gen_tcp:close(Socket)
+    end.
+
 -ifndef(TEST).
 -ifndef(TEST).
 console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
 console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
 -else.
 -else.