Przeglądaj źródła

Merge pull request #10894 from zhongwencool/crl-cache-conf-hot-update

feat: update crl_cache conf at runtime
zhongwencool 2 lat temu
rodzic
commit
26c2ab0bef

+ 51 - 40
apps/emqx/src/emqx_crl_cache.erl

@@ -21,10 +21,10 @@
 %% API
 -export([
     start_link/0,
-    start_link/1,
     register_der_crls/2,
     refresh/1,
-    evict/1
+    evict/1,
+    update_config/1
 ]).
 
 %% gen_server callbacks
@@ -32,13 +32,17 @@
     init/1,
     handle_call/3,
     handle_cast/2,
-    handle_info/2
+    handle_info/2,
+    terminate/2
 ]).
 
+-export([post_config_update/5]).
+
 %% internal exports
 -export([http_get/2]).
 
 -behaviour(gen_server).
+-behaviour(emqx_config_handler).
 
 -include("logger.hrl").
 -include_lib("snabbkaffe/include/snabbkaffe.hrl").
@@ -52,6 +56,7 @@
 -endif.
 -define(DEFAULT_REFRESH_INTERVAL, timer:minutes(15)).
 -define(DEFAULT_CACHE_CAPACITY, 100).
+-define(CONF_KEY_PATH, [crl_cache]).
 
 -record(state, {
     refresh_timers = #{} :: #{binary() => timer:tref()},
@@ -73,12 +78,11 @@
 %% API
 %%--------------------------------------------------------------------
 
-start_link() ->
-    Config = gather_config(),
-    start_link(Config).
+post_config_update(?CONF_KEY_PATH, _Req, Conf, Conf, _AppEnvs) -> ok;
+post_config_update(?CONF_KEY_PATH, _Req, NewConf, _OldConf, _AppEnvs) -> update_config(NewConf).
 
-start_link(Config = #{cache_capacity := _, refresh_interval := _, http_timeout := _}) ->
-    gen_server:start_link({local, ?MODULE}, ?MODULE, Config, []).
+start_link() ->
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 -spec refresh(url()) -> ok.
 refresh(URL) ->
@@ -88,6 +92,10 @@ refresh(URL) ->
 evict(URL) ->
     gen_server:cast(?MODULE, {evict, URL}).
 
+-spec update_config(map()) -> ok.
+update_config(Conf) ->
+    gen_server:cast(?MODULE, {update_config, Conf}).
+
 %% Adds CRLs in DER format to the cache and register them for periodic
 %% refresh.
 -spec register_der_crls(url(), [public_key:der_encoded()]) -> ok.
@@ -98,18 +106,18 @@ register_der_crls(URL, CRLs) when is_list(CRLs) ->
 %% gen_server behaviour
 %%--------------------------------------------------------------------
 
-init(Config) ->
-    #{
-        cache_capacity := CacheCapacity,
-        refresh_interval := RefreshIntervalMS,
-        http_timeout := HTTPTimeoutMS
-    } = Config,
-    State = #state{
-        cache_capacity = CacheCapacity,
-        refresh_interval = RefreshIntervalMS,
-        http_timeout = HTTPTimeoutMS
-    },
-    {ok, State}.
+init([]) ->
+    erlang:process_flag(trap_exit, true),
+    ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
+    Conf = emqx:get_config(
+        ?CONF_KEY_PATH,
+        #{
+            capacity => ?DEFAULT_CACHE_CAPACITY,
+            refresh_interval => ?DEFAULT_REFRESH_INTERVAL,
+            http_timeout => ?HTTP_TIMEOUT
+        }
+    ),
+    {ok, update_state_config(Conf, #state{})}.
 
 handle_call(Call, _From, State) ->
     {reply, {error, {bad_call, Call}}, State}.
@@ -144,6 +152,8 @@ handle_cast({refresh, URL}, State0) ->
             }),
             {noreply, ensure_timer(URL, State0)}
     end;
+handle_cast({update_config, Conf}, State0) ->
+    {noreply, update_state_config(Conf, State0)};
 handle_cast(_Cast, State) ->
     {noreply, State}.
 
@@ -175,10 +185,25 @@ handle_info(
 handle_info(_Info, State) ->
     {noreply, State}.
 
+terminate(_, _) ->
+    emqx_config_handler:remove_handler(?CONF_KEY_PATH).
+
 %%--------------------------------------------------------------------
 %% internal functions
 %%--------------------------------------------------------------------
 
+update_state_config(Conf, State) ->
+    #{
+        capacity := CacheCapacity,
+        refresh_interval := RefreshIntervalMS,
+        http_timeout := HTTPTimeoutMS
+    } = gather_config(Conf),
+    State#state{
+        cache_capacity = CacheCapacity,
+        refresh_interval = RefreshIntervalMS,
+        http_timeout = HTTPTimeoutMS
+    }.
+
 http_get(URL, HTTPTimeout) ->
     httpc:request(
         get,
@@ -198,7 +223,7 @@ do_http_fetch_and_cache(URL, HTTPTimeoutMS) ->
                 CRLs ->
                     %% Note: must ensure it's a string and not a
                     %% binary because that's what the ssl manager uses
-                    %% when doing lookups.
+                    %% when doing lookup.
                     emqx_ssl_crl_cache:insert(to_string(URL), {der, CRLs}),
                     ?tp(crl_cache_insert, #{url => URL, crls => CRLs}),
                     {ok, CRLs}
@@ -232,25 +257,11 @@ ensure_timer(URL, State = #state{refresh_timers = RefreshTimers0}, Timeout) ->
     },
     State#state{refresh_timers = RefreshTimers}.
 
--spec gather_config() ->
-    #{
-        cache_capacity := pos_integer(),
-        refresh_interval := timer:time(),
-        http_timeout := timer:time()
-    }.
-gather_config() ->
-    %% TODO: add a config handler to refresh the config when those
-    %% globals change?
-    CacheCapacity = emqx_config:get([crl_cache, capacity], ?DEFAULT_CACHE_CAPACITY),
-    RefreshIntervalMS0 = emqx_config:get([crl_cache, refresh_interval], ?DEFAULT_REFRESH_INTERVAL),
-    MinimumRefreshInverval = ?MIN_REFRESH_PERIOD,
-    RefreshIntervalMS = max(RefreshIntervalMS0, MinimumRefreshInverval),
-    HTTPTimeoutMS = emqx_config:get([crl_cache, http_timeout], ?HTTP_TIMEOUT),
-    #{
-        cache_capacity => CacheCapacity,
-        refresh_interval => RefreshIntervalMS,
-        http_timeout => HTTPTimeoutMS
-    }.
+gather_config(Conf) ->
+    RefreshIntervalMS0 = maps:get(refresh_interval, Conf),
+    MinimumRefreshInterval = ?MIN_REFRESH_PERIOD,
+    RefreshIntervalMS = max(RefreshIntervalMS0, MinimumRefreshInterval),
+    Conf#{refresh_interval => RefreshIntervalMS}.
 
 -spec handle_register_der_crls(state(), url(), [public_key:der_encoded()]) -> {noreply, state()}.
 handle_register_der_crls(State0, URL0, CRLs) ->

+ 1 - 0
apps/emqx/test/emqx_common_test_helpers.erl

@@ -540,6 +540,7 @@ load_config(SchemaModule, Config) ->
             false -> Config
         end,
     ok = emqx_config:delete_override_conf_files(),
+    ok = copy_acl_conf(),
     ok = emqx_config:init_load(SchemaModule, ConfigBin).
 
 -spec is_all_tcp_servers_available(Servers) -> Result when

+ 34 - 0
apps/emqx/test/emqx_crl_cache_SUITE.erl

@@ -481,6 +481,7 @@ ensure_ssl_manager_alive() ->
 t_init_empty_urls(_Config) ->
     Ref = get_crl_cache_table(),
     ?assertEqual([], ets:tab2list(Ref)),
+    emqx_config_handler:start_link(),
     ?assertMatch({ok, _}, emqx_crl_cache:start_link()),
     receive
         {http_get, _} ->
@@ -488,12 +489,34 @@ t_init_empty_urls(_Config) ->
     after 1000 -> ok
     end,
     ?assertEqual([], ets:tab2list(Ref)),
+    emqx_config_handler:stop(),
+    ok.
+
+t_update_config(_Config) ->
+    emqx_config:save_schema_mod_and_names(emqx_schema),
+    emqx_config_handler:start_link(),
+    {ok, Pid} = emqx_crl_cache:start_link(),
+    Conf = #{
+        refresh_interval => timer:minutes(5),
+        http_timeout => timer:minutes(10),
+        capacity => 123
+    },
+    ?assertMatch({ok, _}, emqx:update_config([<<"crl_cache">>], Conf)),
+    State = sys:get_state(Pid),
+    ?assertEqual(Conf, #{
+        refresh_interval => element(3, State),
+        http_timeout => element(4, State),
+        capacity => element(7, State)
+    }),
+    emqx_config:erase(<<"crl_cache">>),
+    emqx_config_handler:stop(),
     ok.
 
 t_manual_refresh(Config) ->
     CRLDer = ?config(crl_der, Config),
     Ref = get_crl_cache_table(),
     ?assertEqual([], ets:tab2list(Ref)),
+    emqx_config_handler:start_link(),
     {ok, _} = emqx_crl_cache:start_link(),
     URL = "http://localhost/crl.pem",
     ok = snabbkaffe:start_trace(),
@@ -507,6 +530,7 @@ t_manual_refresh(Config) ->
         [{"crl.pem", [CRLDer]}],
         ets:tab2list(Ref)
     ),
+    emqx_config_handler:stop(),
     ok.
 
 t_refresh_request_error(_Config) ->
@@ -517,6 +541,7 @@ t_refresh_request_error(_Config) ->
             {ok, {{"HTTP/1.0", 404, 'Not Found'}, [], <<"not found">>}}
         end
     ),
+    emqx_config_handler:start_link(),
     {ok, _} = emqx_crl_cache:start_link(),
     URL = "http://localhost/crl.pem",
     ?check_trace(
@@ -534,6 +559,7 @@ t_refresh_request_error(_Config) ->
         end
     ),
     ok = snabbkaffe:stop(),
+    emqx_config_handler:stop(),
     ok.
 
 t_refresh_invalid_response(_Config) ->
@@ -544,6 +570,7 @@ t_refresh_invalid_response(_Config) ->
             {ok, {{"HTTP/1.0", 200, 'OK'}, [], <<"not a crl">>}}
         end
     ),
+    emqx_config_handler:start_link(),
     {ok, _} = emqx_crl_cache:start_link(),
     URL = "http://localhost/crl.pem",
     ?check_trace(
@@ -561,6 +588,7 @@ t_refresh_invalid_response(_Config) ->
         end
     ),
     ok = snabbkaffe:stop(),
+    emqx_config_handler:stop(),
     ok.
 
 t_refresh_http_error(_Config) ->
@@ -571,6 +599,7 @@ t_refresh_http_error(_Config) ->
             {error, timeout}
         end
     ),
+    emqx_config_handler:start_link(),
     {ok, _} = emqx_crl_cache:start_link(),
     URL = "http://localhost/crl.pem",
     ?check_trace(
@@ -588,16 +617,20 @@ t_refresh_http_error(_Config) ->
         end
     ),
     ok = snabbkaffe:stop(),
+    emqx_config_handler:stop(),
     ok.
 
 t_unknown_messages(_Config) ->
+    emqx_config_handler:start_link(),
     {ok, Server} = emqx_crl_cache:start_link(),
     gen_server:call(Server, foo),
     gen_server:cast(Server, foo),
     Server ! foo,
+    emqx_config_handler:stop(),
     ok.
 
 t_evict(_Config) ->
+    emqx_config_handler:start_link(),
     {ok, _} = emqx_crl_cache:start_link(),
     URL = "http://localhost/crl.pem",
     ?wait_async_action(
@@ -612,6 +645,7 @@ t_evict(_Config) ->
         #{?snk_kind := crl_cache_evict}
     ),
     ?assertEqual([], ets:tab2list(Ref)),
+    emqx_config_handler:stop(),
     ok.
 
 t_cache(Config) ->