Przeglądaj źródła

feat(listeners): add config handler for listeners

Shawn 4 lat temu
rodzic
commit
0d1bc6d689
2 zmienionych plików z 66 dodań i 13 usunięć
  1. 60 7
      apps/emqx/src/emqx_listeners.erl
  2. 6 6
      apps/emqx/src/emqx_map_lib.erl

+ 60 - 7
apps/emqx/src/emqx_listeners.erl

@@ -41,6 +41,10 @@
         , parse_listener_id/1
         , parse_listener_id/1
         ]).
         ]).
 
 
+-export([post_config_update/4]).
+
+-define(CONF_KEY_PATH, [listeners]).
+
 %% @doc List configured listeners.
 %% @doc List configured listeners.
 -spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]).
 -spec(list() -> [{ListenerId :: atom(), ListenerConf :: map()}]).
 list() ->
 list() ->
@@ -88,6 +92,9 @@ is_running(quic, _ListenerId, _Conf)->
 %% @doc Start all listeners.
 %% @doc Start all listeners.
 -spec(start() -> ok).
 -spec(start() -> ok).
 start() ->
 start() ->
+    %% The ?MODULE:start/0 will be called by emqx_app when emqx get started,
+    %% so we install the config handler here.
+    ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
     foreach_listeners(fun start_listener/3).
     foreach_listeners(fun start_listener/3).
 
 
 -spec start_listener(atom()) -> ok | {error, term()}.
 -spec start_listener(atom()) -> ok | {error, term()}.
@@ -102,7 +109,7 @@ start_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
             console_print("- Skip - starting listener ~s on ~s ~n due to ~p",
             console_print("- Skip - starting listener ~s on ~s ~n due to ~p",
                           [listener_id(Type, ListenerName), format_addr(Bind), Reason]);
                           [listener_id(Type, ListenerName), format_addr(Bind), Reason]);
         {ok, _} ->
         {ok, _} ->
-            console_print("Start listener ~s on ~s successfully.~n",
+            console_print("Listener ~s on ~s started.~n",
                 [listener_id(Type, ListenerName), format_addr(Bind)]);
                 [listener_id(Type, ListenerName), format_addr(Bind)]);
         {error, {already_started, Pid}} ->
         {error, {already_started, Pid}} ->
             {error, {already_started, Pid}};
             {error, {already_started, Pid}};
@@ -122,27 +129,47 @@ restart_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun restart_listener/3).
     apply_on_listener(ListenerId, fun restart_listener/3).
 
 
 -spec(restart_listener(atom(), atom(), map()) -> ok | {error, term()}).
 -spec(restart_listener(atom(), atom(), map()) -> ok | {error, term()}).
+restart_listener(Type, ListenerName, {OldConf, NewConf}) ->
+    restart_listener(Type, ListenerName, OldConf, NewConf);
 restart_listener(Type, ListenerName, Conf) ->
 restart_listener(Type, ListenerName, Conf) ->
-    case stop_listener(Type, ListenerName, Conf) of
-        ok -> start_listener(Type, ListenerName, Conf);
+    restart_listener(Type, ListenerName, Conf, Conf).
+
+restart_listener(Type, ListenerName, OldConf, NewConf) ->
+    case stop_listener(Type, ListenerName, OldConf) of
+        ok -> start_listener(Type, ListenerName, NewConf);
         Error -> Error
         Error -> Error
     end.
     end.
 
 
 %% @doc Stop all listeners.
 %% @doc Stop all listeners.
 -spec(stop() -> ok).
 -spec(stop() -> ok).
 stop() ->
 stop() ->
+    %% The ?MODULE:stop/0 will be called by emqx_app when emqx is going to shutdown,
+    %% so we uninstall the config handler here.
+    _ = emqx_config_handler:remove_handler(?CONF_KEY_PATH),
     foreach_listeners(fun stop_listener/3).
     foreach_listeners(fun stop_listener/3).
 
 
 -spec(stop_listener(atom()) -> ok | {error, term()}).
 -spec(stop_listener(atom()) -> ok | {error, term()}).
 stop_listener(ListenerId) ->
 stop_listener(ListenerId) ->
     apply_on_listener(ListenerId, fun stop_listener/3).
     apply_on_listener(ListenerId, fun stop_listener/3).
 
 
--spec(stop_listener(atom(), atom(), map()) -> ok | {error, term()}).
-stop_listener(Type, ListenerName, #{bind := ListenOn}) when Type == tcp; Type == ssl ->
+stop_listener(Type, ListenerName, #{bind := Bind} = Conf) ->
+    case do_stop_listener(Type, ListenerName, Conf) of
+        ok ->
+            console_print("Listener ~s on ~s stopped.~n",
+                [listener_id(Type, ListenerName), format_addr(Bind)]),
+            ok;
+        {error, Reason} ->
+            ?ELOG("Failed to stop listener ~s on ~s: ~0p~n",
+                  [listener_id(Type, ListenerName), format_addr(Bind), Reason]),
+            {error, Reason}
+    end.
+
+-spec(do_stop_listener(atom(), atom(), map()) -> ok | {error, term()}).
+do_stop_listener(Type, ListenerName, #{bind := ListenOn}) when Type == tcp; Type == ssl ->
     esockd:close(listener_id(Type, ListenerName), ListenOn);
     esockd:close(listener_id(Type, ListenerName), ListenOn);
-stop_listener(Type, ListenerName, _Conf) when Type == ws; Type == wss ->
+do_stop_listener(Type, ListenerName, _Conf) when Type == ws; Type == wss ->
     cowboy:stop_listener(listener_id(Type, ListenerName));
     cowboy:stop_listener(listener_id(Type, ListenerName));
-stop_listener(quic, ListenerName, _Conf) ->
+do_stop_listener(quic, ListenerName, _Conf) ->
     quicer:stop_listener(listener_id(quic, ListenerName)).
     quicer:stop_listener(listener_id(quic, ListenerName)).
 
 
 -ifndef(TEST).
 -ifndef(TEST).
@@ -201,6 +228,32 @@ do_start_listener(quic, ListenerName, #{bind := ListenOn} = Opts) ->
             {ok, {skipped, quic_app_missing}}
             {ok, {skipped, quic_app_missing}}
     end.
     end.
 
 
+%% Update the listeners at runtime
+post_config_update(_Req, NewListeners, OldListeners, _AppEnvs) ->
+    #{added := Added, removed := Removed, changed := Updated}
+        = diff_listeners(NewListeners, OldListeners),
+    perform_listener_changes(fun stop_listener/3, Removed),
+    perform_listener_changes(fun start_listener/3, Added),
+    perform_listener_changes(fun restart_listener/3, Updated).
+
+perform_listener_changes(Action, MapConfs) ->
+    lists:foreach(fun
+        ({Id, Conf}) ->
+            {Type, Name} = parse_listener_id(Id),
+            Action(Type, Name, Conf)
+        end, maps:to_list(MapConfs)).
+
+diff_listeners(NewListeners, OldListeners) ->
+    emqx_map_lib:diff_maps(flatten_listeners(NewListeners), flatten_listeners(OldListeners)).
+
+flatten_listeners(Conf0) ->
+    maps:from_list(
+        lists:append([do_flatten_listeners(Type, Conf)
+                      || {Type, Conf} <- maps:to_list(Conf0)])).
+
+do_flatten_listeners(Type, Conf0) ->
+    [{listener_id(Type, Name), Conf} || {Name, Conf} <- maps:to_list(Conf0)].
+
 esockd_opts(Type, Opts0) ->
 esockd_opts(Type, Opts0) ->
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Opts1 = maps:with([acceptors, max_connections, proxy_protocol, proxy_protocol_timeout], Opts0),
     Opts2 = case emqx_config:get_zone_conf(zone(Opts0), [rate_limit, max_conn_rate]) of
     Opts2 = case emqx_config:get_zone_conf(zone(Opts0), [rate_limit, max_conn_rate]) of

+ 6 - 6
apps/emqx/src/emqx_map_lib.erl

@@ -131,20 +131,20 @@ jsonable_map(Map, JsonableFun) ->
     deep_convert(Map, fun binary_string_kv/3, [JsonableFun]).
     deep_convert(Map, fun binary_string_kv/3, [JsonableFun]).
 
 
 -spec diff_maps(map(), map()) ->
 -spec diff_maps(map(), map()) ->
-    #{added := [map()], identical := [map()], removed := [map()],
-      changed := [#{any() => {OldValue::any(), NewValue::any()}}]}.
+    #{added := map(), identical := map(), removed := map(),
+      changed := #{any() => {OldValue::any(), NewValue::any()}}}.
 diff_maps(NewMap, OldMap) ->
 diff_maps(NewMap, OldMap) ->
-    InitR = #{identical => [], changed => [], removed => []},
+    InitR = #{identical => #{}, changed => #{}, removed => #{}},
     {Result, RemInNew} =
     {Result, RemInNew} =
         lists:foldl(fun({OldK, OldV}, {Result0 = #{identical := I, changed := U, removed := D},
         lists:foldl(fun({OldK, OldV}, {Result0 = #{identical := I, changed := U, removed := D},
                         RemNewMap}) ->
                         RemNewMap}) ->
             Result1 = case maps:find(OldK, NewMap) of
             Result1 = case maps:find(OldK, NewMap) of
                 error ->
                 error ->
-                    Result0#{removed => [#{OldK => OldV} | D]};
+                    Result0#{removed => D#{OldK => OldV}};
                 {ok, NewV} when NewV == OldV ->
                 {ok, NewV} when NewV == OldV ->
-                    Result0#{identical => [#{OldK => OldV} | I]};
+                    Result0#{identical => I#{OldK => OldV}};
                 {ok, NewV} ->
                 {ok, NewV} ->
-                    Result0#{changed => [#{OldK => {OldV, NewV}} | U]}
+                    Result0#{changed => U#{OldK => {OldV, NewV}}}
             end,
             end,
             {Result1, maps:remove(OldK, RemNewMap)}
             {Result1, maps:remove(OldK, RemNewMap)}
         end, {InitR, NewMap}, maps:to_list(OldMap)),
         end, {InitR, NewMap}, maps:to_list(OldMap)),