emqx_acl_cache.erl 7.4 KB


  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_acl_cache).
  17. -include("emqx.hrl").
  18. -export([ list_acl_cache/0
  19. , get_acl_cache/2
  20. , put_acl_cache/3
  21. , cleanup_acl_cache/0
  22. , empty_acl_cache/0
  23. , dump_acl_cache/0
  24. , get_cache_max_size/0
  25. , get_cache_ttl/0
  26. , is_enabled/0
  27. , drain_cache/0
  28. ]).
  29. %% export for test
  30. -export([ cache_k/2
  31. , cache_v/1
  32. , get_cache_size/0
  33. , get_newest_key/0
  34. , get_oldest_key/0
  35. ]).
  36. -type(acl_result() :: allow | deny).
  37. -type(system_time() :: integer()).
  38. -type(cache_key() :: {emqx_types:pubsub(), emqx_types:topic()}).
  39. -type(cache_val() :: {acl_result(), system_time()}).
  40. -type(acl_cache_entry() :: {cache_key(), cache_val()}).
  41. %% Wrappers for key and value
  42. cache_k(PubSub, Topic)-> {PubSub, Topic}.
  43. cache_v(AclResult)-> {AclResult, time_now()}.
  44. drain_k() -> {?MODULE, drain_timestamp}.
  45. -spec(is_enabled() -> boolean()).
  46. is_enabled() ->
  47. application:get_env(emqx, enable_acl_cache, true).
  48. -spec(get_cache_max_size() -> integer()).
  49. get_cache_max_size() ->
  50. application:get_env(emqx, acl_cache_max_size, 32).
  51. -spec(get_cache_ttl() -> integer()).
  52. get_cache_ttl() ->
  53. application:get_env(emqx, acl_cache_ttl, 60000).
  54. -spec(list_acl_cache() -> [acl_cache_entry()]).
  55. list_acl_cache() ->
  56. cleanup_acl_cache(),
  57. map_acl_cache(fun(Cache) -> Cache end).
  58. %% We'll cleanup the cache before replacing an expired acl.
  59. -spec(get_acl_cache(emqx_types:pubsub(), emqx_topic:topic()) -> (acl_result() | not_found)).
  60. get_acl_cache(PubSub, Topic) ->
  61. case erlang:get(cache_k(PubSub, Topic)) of
  62. undefined -> not_found;
  63. {AclResult, CachedAt} ->
  64. if_expired(CachedAt,
  65. fun(false) ->
  66. AclResult;
  67. (true) ->
  68. cleanup_acl_cache(),
  69. not_found
  70. end)
  71. end.
  72. %% If the cache get full, and also the latest one
  73. %% is expired, then delete all the cache entries
  74. -spec(put_acl_cache(emqx_types:pubsub(), emqx_topic:topic(), acl_result()) -> ok).
  75. put_acl_cache(PubSub, Topic, AclResult) ->
  76. MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
  77. Size = get_cache_size(),
  78. case Size < MaxSize of
  79. true ->
  80. add_acl(PubSub, Topic, AclResult);
  81. false ->
  82. NewestK = get_newest_key(),
  83. {_AclResult, CachedAt} = erlang:get(NewestK),
  84. if_expired(CachedAt,
  85. fun(true) ->
  86. % all cache expired, cleanup first
  87. empty_acl_cache(),
  88. add_acl(PubSub, Topic, AclResult);
  89. (false) ->
  90. % cache full, perform cache replacement
  91. evict_acl_cache(),
  92. add_acl(PubSub, Topic, AclResult)
  93. end)
  94. end.
  95. %% delete all the acl entries
  96. -spec(empty_acl_cache() -> ok).
  97. empty_acl_cache() ->
  98. foreach_acl_cache(fun({CacheK, _CacheV}) -> erlang:erase(CacheK) end),
  99. set_cache_size(0),
  100. keys_queue_set(queue:new()).
  101. %% delete the oldest acl entry
  102. -spec(evict_acl_cache() -> ok).
  103. evict_acl_cache() ->
  104. OldestK = keys_queue_out(),
  105. erlang:erase(OldestK),
  106. decr_cache_size().
  107. %% cleanup all the expired cache entries
  108. -spec(cleanup_acl_cache() -> ok).
  109. cleanup_acl_cache() ->
  110. keys_queue_set(
  111. cleanup_acl(keys_queue_get())).
  112. get_oldest_key() ->
  113. keys_queue_pick(queue_front()).
  114. get_newest_key() ->
  115. keys_queue_pick(queue_rear()).
  116. get_cache_size() ->
  117. case erlang:get(acl_cache_size) of
  118. undefined -> 0;
  119. Size -> Size
  120. end.
  121. dump_acl_cache() ->
  122. map_acl_cache(fun(Cache) -> Cache end).
  123. map_acl_cache(Fun) ->
  124. [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
  125. orelse SubPub =:= subscribe].
  126. foreach_acl_cache(Fun) ->
  127. _ = map_acl_cache(Fun),
  128. ok.
  129. %% All acl cache entries added before `drain_cache()` invocation will become expired
  130. drain_cache() ->
  131. _ = persistent_term:put(drain_k(), time_now()),
  132. ok.
  133. %%--------------------------------------------------------------------
  134. %% Internal functions
  135. %%--------------------------------------------------------------------
  136. add_acl(PubSub, Topic, AclResult) ->
  137. K = cache_k(PubSub, Topic),
  138. V = cache_v(AclResult),
  139. case erlang:get(K) of
  140. undefined -> add_new_acl(K, V);
  141. {_AclResult, _CachedAt} ->
  142. update_acl(K, V)
  143. end.
  144. add_new_acl(K, V) ->
  145. erlang:put(K, V),
  146. keys_queue_in(K),
  147. incr_cache_size().
  148. update_acl(K, V) ->
  149. erlang:put(K, V),
  150. keys_queue_update(K).
  151. cleanup_acl(KeysQ) ->
  152. case queue:out(KeysQ) of
  153. {{value, OldestK}, KeysQ2} ->
  154. {_AclResult, CachedAt} = erlang:get(OldestK),
  155. if_expired(CachedAt,
  156. fun(false) -> KeysQ;
  157. (true) ->
  158. erlang:erase(OldestK),
  159. decr_cache_size(),
  160. cleanup_acl(KeysQ2)
  161. end);
  162. {empty, KeysQ} -> KeysQ
  163. end.
  164. incr_cache_size() ->
  165. erlang:put(acl_cache_size, get_cache_size() + 1), ok.
  166. decr_cache_size() ->
  167. Size = get_cache_size(),
  168. case Size > 1 of
  169. true ->
  170. erlang:put(acl_cache_size, Size-1);
  171. false ->
  172. erlang:put(acl_cache_size, 0)
  173. end,
  174. ok.
  175. set_cache_size(N) ->
  176. erlang:put(acl_cache_size, N), ok.
  177. %%% Ordered Keys Q %%%
  178. keys_queue_in(Key) ->
  179. %% delete the key first if exists
  180. KeysQ = keys_queue_get(),
  181. keys_queue_set(queue:in(Key, KeysQ)).
  182. keys_queue_out() ->
  183. case queue:out(keys_queue_get()) of
  184. {{value, OldestK}, Q2} ->
  185. keys_queue_set(Q2), OldestK;
  186. {empty, _Q} ->
  187. undefined
  188. end.
  189. keys_queue_update(Key) ->
  190. NewKeysQ = keys_queue_remove(Key, keys_queue_get()),
  191. keys_queue_set(queue:in(Key, NewKeysQ)).
  192. keys_queue_pick(Pick) ->
  193. KeysQ = keys_queue_get(),
  194. case queue:is_empty(KeysQ) of
  195. true -> undefined;
  196. false -> Pick(KeysQ)
  197. end.
  198. keys_queue_remove(Key, KeysQ) ->
  199. queue:filter(fun
  200. (K) when K =:= Key -> false; (_) -> true
  201. end, KeysQ).
  202. keys_queue_set(KeysQ) ->
  203. erlang:put(acl_keys_q, KeysQ), ok.
  204. keys_queue_get() ->
  205. case erlang:get(acl_keys_q) of
  206. undefined -> queue:new();
  207. KeysQ -> KeysQ
  208. end.
  209. queue_front() -> fun queue:get/1.
  210. queue_rear() -> fun queue:get_r/1.
  211. time_now() -> erlang:system_time(millisecond).
  212. if_expired(CachedAt, Fun) ->
  213. TTL = get_cache_ttl(),
  214. Now = time_now(),
  215. CurrentEvictTimestamp = persistent_term:get(drain_k(), 0),
  216. case CachedAt =< CurrentEvictTimestamp orelse (CachedAt + TTL) =< Now of
  217. true -> Fun(true);
  218. false -> Fun(false)
  219. end.