Просмотр исходного кода

feat(listen): support hot config update of esockd-based listeners

Andrew Mayorov 2 лет назад
Родитель
Сommit
da0f0f947e
1 измененных файлов с 107 добавлено и 110 удалено
  1. 107 110
      apps/emqx/src/emqx_listeners.erl

+ 107 - 110
apps/emqx/src/emqx_listeners.erl

@@ -55,7 +55,6 @@
 ]).
 
 -export([pre_config_update/3, post_config_update/5]).
--export([create_listener/3, remove_listener/3, update_listener/3]).
 
 -export([format_bind/1]).
 
@@ -66,6 +65,11 @@
 -export_type([listener_id/0]).
 
 -type listener_id() :: atom() | binary().
+-type listener_type() :: tcp | ssl | ws | wss | quic | dtls.
+
+-define(ESOCKD_LISTENER(T), (T == tcp orelse T == ssl)).
+-define(COWBOY_LISTENER(T), (T == ws orelse T == wss)).
+
 -define(ROOT_KEY, listeners).
 -define(CONF_KEY_PATH, [?ROOT_KEY, '?', '?']).
 -define(TYPES_STRING, ["tcp", "ssl", "ws", "wss", "quic"]).
@@ -140,15 +144,9 @@ format_raw_listeners({Type0, Conf}) ->
 
 -spec is_running(ListenerId :: atom()) -> boolean() | {error, not_found}.
 is_running(ListenerId) ->
-    case
-        [
-            Running
-         || {Id, #{running := Running}} <- list(),
-            Id =:= ListenerId
-        ]
-    of
-        [] -> {error, not_found};
-        [IsRunning] -> IsRunning
+    case lists:keyfind(ListenerId, 1, list()) of
+        {_Id, #{running := Running}} -> Running;
+        false -> {error, not_found}
     end.
 
 is_running(Type, ListenerId, Conf) when Type =:= tcp; Type =:= ssl ->
@@ -229,11 +227,10 @@ start() ->
 start_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun start_listener/3).
 
--spec start_listener(atom(), atom(), map()) -> ok | {error, term()}.
-start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
+-spec start_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
+start_listener(Type, ListenerName, #{bind := Bind, enable := true} = Conf) ->
     case do_start_listener(Type, ListenerName, Conf) of
         {ok, {skipped, Reason}} when
-            Reason =:= listener_disabled;
             Reason =:= quic_app_missing
         ->
             ?tp(listener_not_started, #{type => Type, bind => Bind, status => {skipped, Reason}}),
@@ -269,7 +266,13 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
                 )
             ),
             {error, {failed_to_start, Msg}}
-    end.
+    end;
+start_listener(Type, ListenerName, #{enable := false}) ->
+    console_print(
+        "Listener ~ts is NOT started due to: disabled.~n",
+        [listener_id(Type, ListenerName)]
+    ),
+    ok.
 
 %% @doc Restart all listeners
 -spec restart() -> ok.
@@ -280,16 +283,35 @@ restart() ->
 restart_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun restart_listener/3).
 
--spec restart_listener(atom(), atom(), map() | {map(), map()}) -> ok | {error, term()}.
-restart_listener(Type, ListenerName, {OldConf, NewConf}) ->
-    restart_listener(Type, ListenerName, OldConf, NewConf);
+-spec restart_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
 restart_listener(Type, ListenerName, Conf) ->
     restart_listener(Type, ListenerName, Conf, Conf).
 
-restart_listener(Type, ListenerName, OldConf, NewConf) ->
-    case stop_listener(Type, ListenerName, OldConf) of
-        ok -> start_listener(Type, ListenerName, NewConf);
-        {error, Reason} -> {error, Reason}
+update_listener(_Type, _Name, #{enable := false}, #{enable := false}) ->
+    ok;
+update_listener(Type, Name, Conf = #{enable := true}, #{enable := false}) ->
+    stop_listener(Type, Name, Conf);
+update_listener(Type, Name, #{enable := false}, Conf = #{enable := true}) ->
+    start_listener(Type, Name, Conf);
+update_listener(Type, Name, OldConf = #{bind := Bind}, NewConf = #{bind := Bind}) ->
+    case do_update_listener(Type, Name, OldConf, NewConf) of
+        ok ->
+            ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
+            ok;
+        {error, _Reason} ->
+            restart_listener(Type, Name, OldConf, NewConf)
+    end;
+update_listener(Type, Name, OldConf, NewConf) ->
+    %% TODO
+    %% Again, we're not strictly required to drop live connections in this case.
+    restart_listener(Type, Name, OldConf, NewConf).
+
+restart_listener(Type, Name, OldConf, NewConf) ->
+    case stop_listener(Type, Name, OldConf) of
+        ok ->
+            start_listener(Type, Name, NewConf);
+        {error, Reason} ->
+            {error, Reason}
     end.
 
 %% @doc Stop all listeners.
@@ -305,9 +327,10 @@ stop() ->
 stop_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun stop_listener/3).
 
-stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
-    Id = listener_id(Type, ListenerName),
+stop_listener(Type, Name, #{bind := Bind} = Conf) ->
+    Id = listener_id(Type, Name),
     ok = del_limiter_bucket(Id, Conf),
+    ok = unregister_ocsp_stapling_refresh(Type, Name),
     case do_stop_listener(Type, Id, Conf) of
         ok ->
             console_print(
@@ -325,11 +348,10 @@ stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
             {error, Reason}
     end.
 
--spec do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}.
-
-do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == tcp; Type == ssl ->
+-spec do_stop_listener(listener_type(), atom(), map()) -> ok | {error, term()}.
+do_stop_listener(Type, Id, #{bind := ListenOn}) when ?ESOCKD_LISTENER(Type) ->
     esockd:close(Id, ListenOn);
-do_stop_listener(Type, Id, #{bind := ListenOn}) when Type == ws; Type == wss ->
+do_stop_listener(Type, Id, #{bind := ListenOn}) when ?COWBOY_LISTENER(Type) ->
     case cowboy:stop_listener(Id) of
         ok ->
             wait_listener_stopped(ListenOn);
@@ -369,39 +391,23 @@ console_print(Fmt, Args) -> ?ULOG(Fmt, Args).
 console_print(_Fmt, _Args) -> ok.
 -endif.
 
-%% Start MQTT/TCP listener
--spec do_start_listener(atom(), atom(), map()) ->
+-spec do_start_listener(listener_type(), atom(), map()) ->
     {ok, pid() | {skipped, atom()}} | {error, term()}.
-do_start_listener(_Type, _ListenerName, #{enable := false}) ->
-    {ok, {skipped, listener_disabled}};
-do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
-    Type == tcp; Type == ssl
-->
-    Id = listener_id(Type, ListenerName),
-    Limiter = limiter(Opts),
-    add_limiter_bucket(Id, Limiter),
+%% Start MQTT/TCP listener
+do_start_listener(Type, Name, #{bind := ListenOn} = Opts) when ?ESOCKD_LISTENER(Type) ->
+    Id = listener_id(Type, Name),
+    ok = add_limiter_bucket(Id, limiter(Opts)),
     esockd:open(
         Id,
         ListenOn,
-        merge_default(esockd_opts(Id, Type, Opts)),
-        {emqx_connection, start_link, [
-            #{
-                listener => {Type, ListenerName},
-                zone => zone(Opts),
-                limiter => Limiter,
-                enable_authn => enable_authn(Opts)
-            }
-        ]}
+        merge_default(esockd_opts(Id, Type, Name, Opts))
     );
 %% Start MQTT/WS listener
-do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when
-    Type == ws; Type == wss
-->
+do_start_listener(Type, ListenerName, #{bind := ListenOn} = Opts) when ?COWBOY_LISTENER(Type) ->
     Id = listener_id(Type, ListenerName),
-    Limiter = limiter(Opts),
-    add_limiter_bucket(Id, Limiter),
+    ok = add_limiter_bucket(Id, limiter(Opts)),
     RanchOpts = ranch_opts(Type, ListenOn, Opts),
-    WsOpts = ws_opts(Type, ListenerName, Opts, Limiter),
+    WsOpts = ws_opts(Type, ListenerName, Opts),
     case Type of
         ws -> cowboy:start_clear(Id, RanchOpts, WsOpts);
         wss -> cowboy:start_tls(Id, RanchOpts, WsOpts)
@@ -476,6 +482,13 @@ do_start_listener(quic, ListenerName, #{bind := Bind} = Opts) ->
             {ok, {skipped, quic_app_missing}}
     end.
 
+do_update_listener(Type, Name, _OldConf, NewConf) when ?ESOCKD_LISTENER(Type) ->
+    Id = listener_id(Type, Name),
+    ListenOn = maps:get(bind, NewConf),
+    esockd:set_options({Id, ListenOn}, esockd_opts(Id, Type, Name, NewConf));
+do_update_listener(_Type, _Name, _OldConf, _NewConf) ->
+    {error, not_supported}.
+
 %% Update the listeners at runtime
 pre_config_update([?ROOT_KEY, Type, Name], {create, NewConf}, V) when
     V =:= undefined orelse V =:= ?TOMBSTONE_VALUE
@@ -501,69 +514,44 @@ pre_config_update([?ROOT_KEY], NewConf, _RawConf) ->
 post_config_update([?ROOT_KEY, Type, Name], {create, _Request}, NewConf, OldConf, _AppEnvs) when
     OldConf =:= undefined orelse OldConf =:= ?TOMBSTONE_TYPE
 ->
-    create_listener(Type, Name, NewConf);
+    start_listener(Type, Name, NewConf);
 post_config_update([?ROOT_KEY, Type, Name], {update, _Request}, NewConf, OldConf, _AppEnvs) ->
-    update_listener(Type, Name, {OldConf, NewConf});
+    update_listener(Type, Name, OldConf, NewConf);
 post_config_update([?ROOT_KEY, Type, Name], ?MARK_DEL, _, OldConf = #{}, _AppEnvs) ->
-    remove_listener(Type, Name, OldConf);
+    stop_listener(Type, Name, OldConf);
 post_config_update([?ROOT_KEY, Type, Name], {action, _Action, _}, NewConf, OldConf, _AppEnvs) ->
-    #{enable := NewEnabled} = NewConf,
-    #{enable := OldEnabled} = OldConf,
-    case {NewEnabled, OldEnabled} of
-        {true, true} ->
-            ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
-            restart_listener(Type, Name, {OldConf, NewConf});
-        {true, false} ->
-            ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
-            start_listener(Type, Name, NewConf);
-        {false, true} ->
-            ok = unregister_ocsp_stapling_refresh(Type, Name),
-            stop_listener(Type, Name, OldConf);
-        {false, false} ->
-            ok = unregister_ocsp_stapling_refresh(Type, Name),
-            stop_listener(Type, Name, OldConf)
-    end;
+    update_listener(Type, Name, OldConf, NewConf);
 post_config_update([?ROOT_KEY], _Request, OldConf, OldConf, _AppEnvs) ->
     ok;
 post_config_update([?ROOT_KEY], _Request, NewConf, OldConf, _AppEnvs) ->
     #{added := Added, removed := Removed, changed := Changed} = diff_confs(NewConf, OldConf),
-    Updated = lists:map(fun({{{T, N}, Old}, {_, New}}) -> {{T, N}, {Old, New}} end, Changed),
-    perform_listener_changes([
-        {fun ?MODULE:remove_listener/3, Removed},
-        {fun ?MODULE:update_listener/3, Updated},
-        {fun ?MODULE:create_listener/3, Added}
-    ]);
+    %% TODO
+    %% This currently lacks transactional semantics. If one of the changes fails,
+    %% previous changes will not be rolled back.
+    perform_listener_changes(
+        [{update, L} || L <- Changed] ++
+            [{stop, L} || L <- Removed] ++
+            [{start, L} || L <- Added]
+    );
 post_config_update(_Path, _Request, _NewConf, _OldConf, _AppEnvs) ->
     ok.
 
-create_listener(Type, Name, NewConf) ->
-    start_listener(Type, Name, NewConf).
-
-remove_listener(Type, Name, OldConf) ->
-    ok = unregister_ocsp_stapling_refresh(Type, Name),
-    stop_listener(Type, Name, OldConf).
-
-update_listener(Type, Name, {OldConf, NewConf}) ->
-    ok = maybe_unregister_ocsp_stapling_refresh(Type, Name, NewConf),
-    restart_listener(Type, Name, {OldConf, NewConf}).
-
 perform_listener_changes([]) ->
     ok;
-perform_listener_changes([{Action, ConfL} | Tasks]) ->
-    case perform_listener_changes(Action, ConfL) of
-        ok -> perform_listener_changes(Tasks);
+perform_listener_changes([{Action, Listener} | Rest]) ->
+    case perform_listener_change(Action, Listener) of
+        ok -> perform_listener_changes(Rest);
         {error, Reason} -> {error, Reason}
     end.
 
-perform_listener_changes(_Action, []) ->
-    ok;
-perform_listener_changes(Action, [{{Type, Name}, Diff} | MapConf]) ->
-    case Action(Type, Name, Diff) of
-        ok -> perform_listener_changes(Action, MapConf);
-        {error, Reason} -> {error, Reason}
-    end.
+perform_listener_change(start, {Type, Name, Conf}) ->
+    start_listener(Type, Name, Conf);
+perform_listener_change(update, {{Type, Name, ConfOld}, {_, _, ConfNew}}) ->
+    update_listener(Type, Name, ConfOld, ConfNew);
+perform_listener_change(stop, {Type, Name, Conf}) ->
+    stop_listener(Type, Name, Conf).
 
-esockd_opts(ListenerId, Type, Opts0) ->
+esockd_opts(ListenerId, Type, Name, Opts0) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Limiter = limiter(Opts0),
     Opts2 =
@@ -579,7 +567,16 @@ esockd_opts(ListenerId, Type, Opts0) ->
         end,
     Opts3 = Opts2#{
         access_rules => esockd_access_rules(maps:get(access_rules, Opts0, [])),
-        tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]}
+        tune_fun => {emqx_olp, backoff_new_conn, [zone(Opts0)]},
+        connection_mfargs =>
+            {emqx_connection, start_link, [
+                #{
+                    listener => {Type, Name},
+                    zone => zone(Opts0),
+                    limiter => Limiter,
+                    enable_authn => enable_authn(Opts0)
+                }
+            ]}
     },
     maps:to_list(
         case Type of
@@ -593,12 +590,12 @@ esockd_opts(ListenerId, Type, Opts0) ->
         end
     ).
 
-ws_opts(Type, ListenerName, Opts, Limiter) ->
+ws_opts(Type, ListenerName, Opts) ->
     WsPaths = [
         {emqx_utils_maps:deep_get([websocket, mqtt_path], Opts, "/mqtt"), emqx_ws_connection, #{
             zone => zone(Opts),
             listener => {Type, ListenerName},
-            limiter => Limiter,
+            limiter => limiter(Opts),
             enable_authn => enable_authn(Opts)
         }}
     ],
@@ -742,24 +739,24 @@ diff_confs(NewConfs, OldConfs) ->
     emqx_utils:diff_lists(
         flatten_confs(NewConfs),
         flatten_confs(OldConfs),
-        fun({Key, _}) -> Key end
+        fun({Type, Name, _}) -> {Type, Name} end
     ).
 
-flatten_confs(Conf0) ->
+flatten_confs(Confs) ->
     lists:flatmap(
-        fun({Type, Conf}) ->
-            do_flatten_confs(Type, Conf)
+        fun({Type, Listeners}) ->
+            do_flatten_confs(Type, Listeners)
         end,
-        maps:to_list(Conf0)
+        maps:to_list(Confs)
     ).
 
-do_flatten_confs(Type, Conf0) ->
+do_flatten_confs(Type, Listeners) ->
     FilterFun =
         fun
             ({_Name, ?TOMBSTONE_TYPE}) -> false;
-            ({Name, Conf}) -> {true, {{Type, Name}, Conf}}
+            ({Name, Conf}) -> {true, {Type, Name, Conf}}
         end,
-    lists:filtermap(FilterFun, maps:to_list(Conf0)).
+    lists:filtermap(FilterFun, maps:to_list(Listeners)).
 
 enable_authn(Opts) ->
     maps:get(enable_authn, Opts, true).