emqx_authz_cache.erl 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2018-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_authz_cache).
  17. -include("emqx_access_control.hrl").
  18. -export([
  19. list_authz_cache/0,
  20. get_authz_cache/2,
  21. put_authz_cache/3,
  22. cleanup_authz_cache/0,
  23. empty_authz_cache/0,
  24. dump_authz_cache/0,
  25. get_cache_max_size/0,
  26. get_cache_ttl/0,
  27. is_enabled/0,
  28. drain_cache/0,
  29. drain_cache/1
  30. ]).
  31. %% export for test
  32. -export([
  33. cache_k/2,
  34. cache_v/1,
  35. get_cache_size/0,
  36. get_newest_key/0,
  37. get_oldest_key/0
  38. ]).
  39. -type authz_result() :: allow | deny.
  40. -type system_time() :: integer().
  41. -type cache_key() :: {emqx_types:pubsub(), emqx_types:topic()}.
  42. -type cache_val() :: {authz_result(), system_time()}.
  43. -type authz_cache_entry() :: {cache_key(), cache_val()}.
  44. %% Wrappers for key and value
  45. cache_k(PubSub, Topic) -> {PubSub, Topic}.
  46. cache_v(AuthzResult) -> {AuthzResult, time_now()}.
  47. drain_k() -> {?MODULE, drain_timestamp}.
  48. -spec is_enabled() -> boolean().
  49. is_enabled() ->
  50. emqx:get_config([authorization, cache, enable], false).
  51. -spec get_cache_max_size() -> integer().
  52. get_cache_max_size() ->
  53. emqx:get_config([authorization, cache, max_size]).
  54. -spec get_cache_ttl() -> integer().
  55. get_cache_ttl() ->
  56. emqx:get_config([authorization, cache, ttl]).
  57. -spec list_authz_cache() -> [authz_cache_entry()].
  58. list_authz_cache() ->
  59. cleanup_authz_cache(),
  60. map_authz_cache(fun(Cache) -> Cache end).
  61. %% We'll cleanup the cache before replacing an expired authz.
  62. -spec get_authz_cache(emqx_types:pubsub(), emqx_types:topic()) ->
  63. authz_result() | not_found.
  64. get_authz_cache(PubSub, Topic) ->
  65. case erlang:get(cache_k(PubSub, Topic)) of
  66. undefined ->
  67. not_found;
  68. {AuthzResult, CachedAt} ->
  69. if_expired(
  70. get_cache_ttl(),
  71. CachedAt,
  72. fun
  73. (false) ->
  74. AuthzResult;
  75. (true) ->
  76. cleanup_authz_cache(),
  77. not_found
  78. end
  79. )
  80. end.
  81. %% If the cache get full, and also the latest one
  82. %% is expired, then delete all the cache entries
  83. -spec put_authz_cache(emqx_types:pubsub(), emqx_types:topic(), authz_result()) ->
  84. ok.
  85. put_authz_cache(PubSub, Topic, AuthzResult) ->
  86. MaxSize = get_cache_max_size(),
  87. true = (MaxSize =/= 0),
  88. Size = get_cache_size(),
  89. case Size < MaxSize of
  90. true ->
  91. add_authz(PubSub, Topic, AuthzResult);
  92. false ->
  93. NewestK = get_newest_key(),
  94. {_AuthzResult, CachedAt} = erlang:get(NewestK),
  95. if_expired(
  96. get_cache_ttl(),
  97. CachedAt,
  98. fun
  99. (true) ->
  100. % all cache expired, cleanup first
  101. empty_authz_cache(),
  102. add_authz(PubSub, Topic, AuthzResult);
  103. (false) ->
  104. % cache full, perform cache replacement
  105. evict_authz_cache(),
  106. add_authz(PubSub, Topic, AuthzResult)
  107. end
  108. )
  109. end.
  110. %% delete all the authz entries
  111. -spec empty_authz_cache() -> ok.
  112. empty_authz_cache() ->
  113. foreach_authz_cache(fun({CacheK, _CacheV}) -> erlang:erase(CacheK) end),
  114. set_cache_size(0),
  115. keys_queue_set(queue:new()).
  116. %% delete the oldest authz entry
  117. -spec evict_authz_cache() -> ok.
  118. evict_authz_cache() ->
  119. OldestK = keys_queue_out(),
  120. erlang:erase(OldestK),
  121. decr_cache_size().
  122. %% cleanup all the expired cache entries
  123. -spec cleanup_authz_cache() -> ok.
  124. cleanup_authz_cache() ->
  125. keys_queue_set(
  126. cleanup_authz(get_cache_ttl(), keys_queue_get())
  127. ).
  128. get_oldest_key() ->
  129. keys_queue_pick(queue_front()).
  130. get_newest_key() ->
  131. keys_queue_pick(queue_rear()).
  132. get_cache_size() ->
  133. case erlang:get(authz_cache_size) of
  134. undefined -> 0;
  135. Size -> Size
  136. end.
  137. dump_authz_cache() ->
  138. map_authz_cache(fun(Cache) -> Cache end).
  139. map_authz_cache(Fun) ->
  140. [
  141. Fun(R)
  142. || R = {{?authz_action, _T}, _Authz} <- erlang:get()
  143. ].
  144. foreach_authz_cache(Fun) ->
  145. _ = map_authz_cache(Fun),
  146. ok.
  147. %% All authz cache entries added before `drain_cache()` invocation will become expired
  148. drain_cache() ->
  149. _ = persistent_term:put(drain_k(), time_now()),
  150. ok.
  151. -spec drain_cache(emqx_types:clientid()) -> ok | {error, not_found}.
  152. drain_cache(ClientId) ->
  153. case emqx_cm:lookup_channels(ClientId) of
  154. [] ->
  155. {error, not_found};
  156. Pids when is_list(Pids) ->
  157. erlang:send(lists:last(Pids), clean_authz_cache),
  158. ok
  159. end.
  160. %%--------------------------------------------------------------------
  161. %% Internal functions
  162. %%--------------------------------------------------------------------
  163. add_authz(PubSub, Topic, AuthzResult) ->
  164. K = cache_k(PubSub, Topic),
  165. V = cache_v(AuthzResult),
  166. case erlang:get(K) of
  167. undefined -> add_new_authz(K, V);
  168. {_AuthzResult, _CachedAt} -> update_authz(K, V)
  169. end.
  170. add_new_authz(K, V) ->
  171. erlang:put(K, V),
  172. keys_queue_in(K),
  173. incr_cache_size().
  174. update_authz(K, V) ->
  175. erlang:put(K, V),
  176. keys_queue_update(K).
  177. cleanup_authz(TTL, KeysQ) ->
  178. case queue:out(KeysQ) of
  179. {{value, OldestK}, KeysQ2} ->
  180. {_AuthzResult, CachedAt} = erlang:get(OldestK),
  181. if_expired(
  182. TTL,
  183. CachedAt,
  184. fun
  185. (false) ->
  186. KeysQ;
  187. (true) ->
  188. erlang:erase(OldestK),
  189. decr_cache_size(),
  190. cleanup_authz(TTL, KeysQ2)
  191. end
  192. );
  193. {empty, KeysQ} ->
  194. KeysQ
  195. end.
  196. incr_cache_size() ->
  197. erlang:put(authz_cache_size, get_cache_size() + 1),
  198. ok.
  199. decr_cache_size() ->
  200. Size = get_cache_size(),
  201. case Size > 1 of
  202. true ->
  203. erlang:put(authz_cache_size, Size - 1);
  204. false ->
  205. erlang:put(authz_cache_size, 0)
  206. end,
  207. ok.
  208. set_cache_size(N) ->
  209. erlang:put(authz_cache_size, N),
  210. ok.
  211. %%% Ordered Keys Q %%%
  212. keys_queue_in(Key) ->
  213. %% delete the key first if exists
  214. KeysQ = keys_queue_get(),
  215. keys_queue_set(queue:in(Key, KeysQ)).
  216. keys_queue_out() ->
  217. case queue:out(keys_queue_get()) of
  218. {{value, OldestK}, Q2} ->
  219. keys_queue_set(Q2),
  220. OldestK;
  221. {empty, _Q} ->
  222. undefined
  223. end.
  224. keys_queue_update(Key) ->
  225. NewKeysQ = keys_queue_remove(Key, keys_queue_get()),
  226. keys_queue_set(queue:in(Key, NewKeysQ)).
  227. keys_queue_pick(Pick) ->
  228. KeysQ = keys_queue_get(),
  229. case queue:is_empty(KeysQ) of
  230. true -> undefined;
  231. false -> Pick(KeysQ)
  232. end.
  233. keys_queue_remove(Key, KeysQ) ->
  234. queue:filter(
  235. fun
  236. (K) when K =:= Key -> false;
  237. (_) -> true
  238. end,
  239. KeysQ
  240. ).
  241. keys_queue_set(KeysQ) ->
  242. erlang:put(authz_keys_q, KeysQ),
  243. ok.
  244. keys_queue_get() ->
  245. case erlang:get(authz_keys_q) of
  246. undefined -> queue:new();
  247. KeysQ -> KeysQ
  248. end.
  249. queue_front() -> fun queue:get/1.
  250. queue_rear() -> fun queue:get_r/1.
  251. time_now() -> erlang:system_time(millisecond).
  252. if_expired(TTL, CachedAt, Fun) ->
  253. Now = time_now(),
  254. CurrentEvictTimestamp = persistent_term:get(drain_k(), 0),
  255. case CachedAt =< CurrentEvictTimestamp orelse (CachedAt + TTL) =< Now of
  256. true -> Fun(true);
  257. false -> Fun(false)
  258. end.