emqx_crl_cache.erl 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2022-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%
  16. %% @doc EMQX CRL cache.
  17. %%--------------------------------------------------------------------
  18. -module(emqx_crl_cache).
  19. %% API
  20. -export([
  21. start_link/0,
  22. register_der_crls/2,
  23. refresh/1,
  24. evict/1,
  25. update_config/1,
  26. info/0
  27. ]).
  28. %% gen_server callbacks
  29. -export([
  30. init/1,
  31. handle_call/3,
  32. handle_cast/2,
  33. handle_info/2,
  34. terminate/2
  35. ]).
  36. -export([post_config_update/5]).
  37. %% internal exports
  38. -export([http_get/2]).
  39. -behaviour(gen_server).
  40. -behaviour(emqx_config_handler).
  41. -include("logger.hrl").
  42. -include_lib("snabbkaffe/include/snabbkaffe.hrl").
  43. -define(HTTP_TIMEOUT, timer:seconds(15)).
  44. -define(RETRY_TIMEOUT, 5_000).
  45. -ifdef(TEST).
  46. -define(MIN_REFRESH_PERIOD, timer:seconds(5)).
  47. -else.
  48. -define(MIN_REFRESH_PERIOD, timer:minutes(1)).
  49. -endif.
  50. -define(DEFAULT_REFRESH_INTERVAL, timer:minutes(15)).
  51. -define(DEFAULT_CACHE_CAPACITY, 100).
  52. -define(CONF_KEY_PATH, [crl_cache]).
  53. -type duration() :: non_neg_integer().
  54. -record(state, {
  55. refresh_timers = #{} :: #{binary() => reference()},
  56. refresh_interval = timer:minutes(15) :: duration(),
  57. http_timeout = ?HTTP_TIMEOUT :: duration(),
  58. %% keeps track of URLs by insertion time
  59. insertion_times = gb_trees:empty() :: gb_trees:tree(duration(), url()),
  60. %% the set of cached URLs, for testing if an URL is already
  61. %% registered.
  62. cached_urls = sets:new([{version, 2}]) :: sets:set(url()),
  63. cache_capacity = 100 :: pos_integer(),
  64. %% for future use
  65. extra = #{} :: map()
  66. }).
  67. -type url() :: uri_string:uri_string().
  68. -type state() :: #state{}.
  69. %%--------------------------------------------------------------------
  70. %% API
  71. %%--------------------------------------------------------------------
  72. post_config_update(?CONF_KEY_PATH, _Req, Conf, Conf, _AppEnvs) -> ok;
  73. post_config_update(?CONF_KEY_PATH, _Req, NewConf, _OldConf, _AppEnvs) -> update_config(NewConf).
  74. start_link() ->
  75. gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
  76. -spec refresh(url()) -> ok.
  77. refresh(URL) ->
  78. gen_server:cast(?MODULE, {refresh, URL}).
  79. -spec evict(url()) -> ok.
  80. evict(URL) ->
  81. gen_server:cast(?MODULE, {evict, URL}).
  82. -spec update_config(map()) -> ok.
  83. update_config(Conf) ->
  84. gen_server:cast(?MODULE, {update_config, Conf}).
  85. %% Adds CRLs in DER format to the cache and register them for periodic
  86. %% refresh.
  87. -spec register_der_crls(url(), [public_key:der_encoded()]) -> ok.
  88. register_der_crls(URL, CRLs) when is_list(CRLs) ->
  89. gen_server:cast(?MODULE, {register_der_crls, URL, CRLs}).
  90. -spec info() -> #{atom() => _}.
  91. info() ->
  92. [state | State] = tuple_to_list(sys:get_state(?MODULE)),
  93. maps:from_list(lists:zip(record_info(fields, state), State)).
  94. %%--------------------------------------------------------------------
  95. %% gen_server behaviour
  96. %%--------------------------------------------------------------------
  97. init([]) ->
  98. erlang:process_flag(trap_exit, true),
  99. ok = emqx_config_handler:add_handler(?CONF_KEY_PATH, ?MODULE),
  100. Conf = emqx:get_config(
  101. ?CONF_KEY_PATH,
  102. #{
  103. capacity => ?DEFAULT_CACHE_CAPACITY,
  104. refresh_interval => ?DEFAULT_REFRESH_INTERVAL,
  105. http_timeout => ?HTTP_TIMEOUT
  106. }
  107. ),
  108. {ok, update_state_config(Conf, #state{})}.
  109. handle_call(Call, _From, State) ->
  110. {reply, {error, {bad_call, Call}}, State}.
  111. handle_cast({evict, URL}, State0 = #state{refresh_timers = RefreshTimers0}) ->
  112. emqx_ssl_crl_cache:delete(URL),
  113. MTimer = maps:get(URL, RefreshTimers0, undefined),
  114. emqx_utils:cancel_timer(MTimer),
  115. RefreshTimers = maps:without([URL], RefreshTimers0),
  116. State = State0#state{refresh_timers = RefreshTimers},
  117. ?tp(
  118. crl_cache_evict,
  119. #{url => URL}
  120. ),
  121. {noreply, State};
  122. handle_cast({register_der_crls, URL, CRLs}, State0) ->
  123. handle_register_der_crls(State0, URL, CRLs);
  124. handle_cast({refresh, URL}, State0) ->
  125. case do_http_fetch_and_cache(URL, State0#state.http_timeout) of
  126. {error, Error} ->
  127. ?tp(crl_refresh_failure, #{error => Error, url => URL}),
  128. ?SLOG(error, #{
  129. msg => "failed_to_fetch_crl_response",
  130. url => URL,
  131. error => Error
  132. }),
  133. {noreply, ensure_timer(URL, State0, ?RETRY_TIMEOUT)};
  134. {ok, _CRLs} ->
  135. ?SLOG(debug, #{
  136. msg => "fetched_crl_response",
  137. url => URL
  138. }),
  139. {noreply, ensure_timer(URL, State0)}
  140. end;
  141. handle_cast({update_config, Conf}, State0) ->
  142. {noreply, update_state_config(Conf, State0)};
  143. handle_cast(_Cast, State) ->
  144. {noreply, State}.
  145. handle_info(
  146. {timeout, TRef, {refresh, URL}},
  147. State = #state{
  148. refresh_timers = RefreshTimers,
  149. http_timeout = HTTPTimeoutMS
  150. }
  151. ) ->
  152. case maps:get(URL, RefreshTimers, undefined) of
  153. TRef ->
  154. ?tp(debug, crl_refresh_timer, #{url => URL}),
  155. case do_http_fetch_and_cache(URL, HTTPTimeoutMS) of
  156. {error, Error} ->
  157. ?SLOG(error, #{
  158. msg => "failed_to_fetch_crl_response",
  159. url => URL,
  160. error => Error
  161. }),
  162. {noreply, ensure_timer(URL, State, ?RETRY_TIMEOUT)};
  163. {ok, _CRLs} ->
  164. ?tp(debug, crl_refresh_timer_done, #{url => URL}),
  165. {noreply, ensure_timer(URL, State)}
  166. end;
  167. _ ->
  168. {noreply, State}
  169. end;
  170. handle_info(_Info, State) ->
  171. {noreply, State}.
  172. terminate(_, _) ->
  173. emqx_config_handler:remove_handler(?CONF_KEY_PATH).
  174. %%--------------------------------------------------------------------
  175. %% internal functions
  176. %%--------------------------------------------------------------------
  177. update_state_config(Conf, State) ->
  178. #{
  179. capacity := CacheCapacity,
  180. refresh_interval := RefreshIntervalMS,
  181. http_timeout := HTTPTimeoutMS
  182. } = gather_config(Conf),
  183. State#state{
  184. cache_capacity = CacheCapacity,
  185. refresh_interval = RefreshIntervalMS,
  186. http_timeout = HTTPTimeoutMS
  187. }.
  188. http_get(URL, HTTPTimeout) ->
  189. httpc:request(
  190. get,
  191. {URL, [{"connection", "close"}]},
  192. [{timeout, HTTPTimeout}],
  193. [{body_format, binary}]
  194. ).
  195. do_http_fetch_and_cache(URL, HTTPTimeoutMS) ->
  196. ?tp(crl_http_fetch, #{crl_url => URL}),
  197. Resp = ?MODULE:http_get(URL, HTTPTimeoutMS),
  198. case Resp of
  199. {ok, {{_, 200, _}, _, Body}} ->
  200. case parse_crls(Body) of
  201. error ->
  202. {error, invalid_crl};
  203. CRLs ->
  204. %% Note: must ensure it's a string and not a
  205. %% binary because that's what the ssl manager uses
  206. %% when doing lookup.
  207. emqx_ssl_crl_cache:insert(to_string(URL), {der, CRLs}),
  208. ?tp(crl_cache_insert, #{url => URL, crls => CRLs}),
  209. {ok, CRLs}
  210. end;
  211. {ok, {{_, Code, _}, _, Body}} ->
  212. {error, {bad_response, #{code => Code, body => Body}}};
  213. {error, Error} ->
  214. {error, {http_error, Error}}
  215. end.
  216. parse_crls(Bin) ->
  217. try
  218. [CRL || {'CertificateList', CRL, not_encrypted} <- public_key:pem_decode(Bin)]
  219. catch
  220. _:_ ->
  221. error
  222. end.
  223. ensure_timer(URL, State = #state{refresh_interval = Timeout}) ->
  224. ensure_timer(URL, State, Timeout).
  225. ensure_timer(URL, State = #state{refresh_timers = RefreshTimers0}, Timeout) ->
  226. ?tp(crl_cache_ensure_timer, #{url => URL, timeout => Timeout}),
  227. MTimer = maps:get(URL, RefreshTimers0, undefined),
  228. emqx_utils:cancel_timer(MTimer),
  229. RefreshTimers = RefreshTimers0#{
  230. URL => emqx_utils:start_timer(
  231. Timeout,
  232. {refresh, URL}
  233. )
  234. },
  235. State#state{refresh_timers = RefreshTimers}.
  236. gather_config(Conf) ->
  237. RefreshIntervalMS0 = maps:get(refresh_interval, Conf),
  238. MinimumRefreshInterval = ?MIN_REFRESH_PERIOD,
  239. RefreshIntervalMS = max(RefreshIntervalMS0, MinimumRefreshInterval),
  240. Conf#{refresh_interval => RefreshIntervalMS}.
  241. -spec handle_register_der_crls(state(), url(), [public_key:der_encoded()]) -> {noreply, state()}.
  242. handle_register_der_crls(State0, URL0, CRLs) ->
  243. #state{cached_urls = CachedURLs0} = State0,
  244. URL = to_string(URL0),
  245. case sets:is_element(URL, CachedURLs0) of
  246. true ->
  247. {noreply, State0};
  248. false ->
  249. emqx_ssl_crl_cache:insert(URL, {der, CRLs}),
  250. ?tp(debug, new_crl_url_inserted, #{url => URL}),
  251. State1 = do_register_url(State0, URL),
  252. State2 = handle_cache_overflow(State1),
  253. State = ensure_timer(URL, State2),
  254. {noreply, State}
  255. end.
  256. -spec do_register_url(state(), url()) -> state().
  257. do_register_url(State0, URL) ->
  258. #state{
  259. cached_urls = CachedURLs0,
  260. insertion_times = InsertionTimes0
  261. } = State0,
  262. Now = erlang:monotonic_time(),
  263. CachedURLs = sets:add_element(URL, CachedURLs0),
  264. InsertionTimes = gb_trees:enter(Now, URL, InsertionTimes0),
  265. State0#state{
  266. cached_urls = CachedURLs,
  267. insertion_times = InsertionTimes
  268. }.
  269. -spec handle_cache_overflow(state()) -> state().
  270. handle_cache_overflow(State0) ->
  271. #state{
  272. cached_urls = CachedURLs0,
  273. insertion_times = InsertionTimes0,
  274. cache_capacity = CacheCapacity,
  275. refresh_timers = RefreshTimers0
  276. } = State0,
  277. case sets:size(CachedURLs0) > CacheCapacity of
  278. false ->
  279. State0;
  280. true ->
  281. {_Time, OldestURL, InsertionTimes} = gb_trees:take_smallest(InsertionTimes0),
  282. emqx_ssl_crl_cache:delete(OldestURL),
  283. MTimer = maps:get(OldestURL, RefreshTimers0, undefined),
  284. emqx_utils:cancel_timer(MTimer),
  285. RefreshTimers = maps:remove(OldestURL, RefreshTimers0),
  286. CachedURLs = sets:del_element(OldestURL, CachedURLs0),
  287. ?tp(debug, crl_cache_overflow, #{oldest_url => OldestURL}),
  288. State0#state{
  289. insertion_times = InsertionTimes,
  290. cached_urls = CachedURLs,
  291. refresh_timers = RefreshTimers
  292. }
  293. end.
  294. to_string(B) when is_binary(B) ->
  295. binary_to_list(B);
  296. to_string(L) when is_list(L) ->
  297. L.