Browse Source

feat(listen): support hot update of WS/WSS options as well

However, currently any transport option changes will cause listener to
restart, which slightly impacts the availability of the broker.
Andrew Mayorov 2 years ago
parent
commit
036048b9ab
1 changed files with 40 additions and 17 deletions
  1. 40 17
      apps/emqx/src/emqx_listeners.erl

+ 40 - 17
apps/emqx/src/emqx_listeners.erl

@@ -293,18 +293,14 @@ update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) ->
     stop_listener(Type, Name, Conf);
 update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) ->
     start_listener(Type, Name, Conf);
-update_listener(Type, Name, OldConf = #{bind := Bind}, NewConf = #{bind := Bind}) ->
+update_listener(Type, Name, OldConf, NewConf) ->
     case do_update_listener(Type, Name, OldConf, NewConf) of
         ok ->
             ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
             ok;
         {error, _Reason} ->
             restart_listener(Type, Name, OldConf, NewConf)
-    end;
-update_listener(Type, Name, OldConf, NewConf) ->
-    %% TODO
-    %% Again, we're not strictly required to drop live connections in this case.
-    restart_listener(Type, Name, OldConf, NewConf).
+    end.
 
 restart_listener(Type, Name, OldConf, NewConf) ->
     case stop_listener(Type, Name, OldConf) of
@@ -403,11 +399,11 @@ do_start_listener(Type, Name, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(
         merge_default(esockd_opts(Id, Type, Name, Opts))
     );
 %% Start MQTT/WS listener
-do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when ?COWBOY_LISTENER(Type) ->
-    Id = listener_id(Type, ListenerName),
+do_start_listener(Type, Name, Opts) when ?COWBOY_LISTENER(Type) ->
+    Id = listener_id(Type, Name),
     ok = add_limiter_bucket(Id, limiter(Opts)),
-    RanchOpts = ranch_opts(Type, ListenOn, Opts),
-    WsOpts = ws_opts(Type, ListenerName, Opts),
+    RanchOpts = ranch_opts(Type, Opts),
+    WsOpts = ws_opts(Type, Name, Opts),
     case Type of
         ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
         wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
@@ -482,10 +478,36 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
             {ok, {skipped, quic_app_missing}}
     end.
 
-do_update_listener(Type, Name, _OldConf, NewConf) when ?ESOCKD_LISTENER(Type) ->
+do_update_listener(Type, Name, OldConf, NewConf = #{bind := ListenOn}) when
+    ?ESOCKD_LISTENER(Type)
+->
+    Id = listener_id(Type, Name),
+    case maps:get(bind, OldConf) of
+        ListenOn ->
+            esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
+        _Different ->
+            %% TODO
+            %% Again, we're not strictly required to drop live connections in this case.
+            {error, not_supported}
+    end;
+do_update_listener(Type, Name, OldConf, NewConf) when
+    ?COWBOY_LISTENER(Type)
+->
     Id = listener_id(Type, Name),
-    ListenOn = maps:get(bind, NewConf),
-    esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
+    RanchOpts = ranch_opts(Type, NewConf),
+    WsOpts = ws_opts(Type, Name, NewConf),
+    case ranch_opts(Type, OldConf) of
+        RanchOpts ->
+            %% Transport options did not change, no need to touch the listener.
+            ok;
+        _Different ->
+            %% Transport options changed, we need to tear down the listener.
+            ok = ranch:suspend_listener(Id),
+            ok = ranch:set_transport_options(Id, RanchOpts)
+    end,
+    ok = ranch:set_protocol_options(Id, WsOpts),
+    %% No-op if the listener was not suspended.
+    ranch:resume_listener(Id);
 do_update_listener(_Type, _Name, _OldConf, _NewConf) ->
     {error, not_supported}.
 
@@ -591,19 +613,20 @@ esockd_opts(ListenerId, Type, Name, Opts0) ->
     ).
 
 ws_opts(Type, ListenerName, Opts) ->
-    WsPaths = [
-        {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{
+    WsPath = emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"),
+    WsRoutes = [
+        {WsPath, emqx_ws_connection, #{
             zone => zone(Opts),
             listener => {Type, ListenerName},
             limiter => limiter(Opts),
             enable_authn => enable_authn(Opts)
         }}
     ],
-    Dispatch = cowboy_router:compile([{'_', WsPaths}]),
+    Dispatch = cowboy_router:compile([{'_', WsRoutes}]),
     ProxyProto = maps:get(proxy_protocol, Opts, false),
     #{env => #{dispatch => Dispatch}, proxy_header => ProxyProto}.
 
-ranch_opts(Type, ListenOn, Opts) ->
+ranch_opts(Type, Opts = #{bind := ListenOn}) ->
     NumAcceptors = maps:get(acceptors, Opts, 4),
     MaxConnections = maps:get(max_connections, Opts, 1024),
     SocketOpts =