emqx_acl_cache.erl 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_acl_cache).
  17. -include("emqx.hrl").
  18. -export([ list_acl_cache/0
  19. , get_acl_cache/2
  20. , put_acl_cache/3
  21. , cleanup_acl_cache/0
  22. , empty_acl_cache/0
  23. , dump_acl_cache/0
  24. , get_cache_max_size/0
  25. , get_cache_ttl/0
  26. , is_enabled/0
  27. ]).
  28. %% export for test
  29. -export([ cache_k/2
  30. , cache_v/1
  31. , get_cache_size/0
  32. , get_newest_key/0
  33. , get_oldest_key/0
  34. ]).
  35. -type(acl_result() :: allow | deny).
  36. -type(system_time() :: integer()).
  37. -type(cache_key() :: {emqx_types:pubsub(), emqx_types:topic()}).
  38. -type(cache_val() :: {acl_result(), system_time()}).
  39. -type(acl_cache_entry() :: {cache_key(), cache_val()}).
  40. %% Wrappers for key and value
  41. cache_k(PubSub, Topic)-> {PubSub, Topic}.
  42. cache_v(AclResult)-> {AclResult, time_now()}.
  43. -spec(is_enabled() -> boolean()).
  44. is_enabled() ->
  45. application:get_env(emqx, enable_acl_cache, true).
  46. -spec(get_cache_max_size() -> integer()).
  47. get_cache_max_size() ->
  48. application:get_env(emqx, acl_cache_max_size, 32).
  49. -spec(get_cache_ttl() -> integer()).
  50. get_cache_ttl() ->
  51. application:get_env(emqx, acl_cache_ttl, 60000).
  52. -spec(list_acl_cache() -> [acl_cache_entry()]).
  53. list_acl_cache() ->
  54. cleanup_acl_cache(),
  55. map_acl_cache(fun(Cache) -> Cache end).
  56. %% We'll cleanup the cache before replacing an expired acl.
  57. -spec(get_acl_cache(emqx_types:pubsub(), emqx_topic:topic()) -> (acl_result() | not_found)).
  58. get_acl_cache(PubSub, Topic) ->
  59. case erlang:get(cache_k(PubSub, Topic)) of
  60. undefined -> not_found;
  61. {AclResult, CachedAt} ->
  62. if_expired(CachedAt,
  63. fun(false) ->
  64. AclResult;
  65. (true) ->
  66. cleanup_acl_cache(),
  67. not_found
  68. end)
  69. end.
  70. %% If the cache get full, and also the latest one
  71. %% is expired, then delete all the cache entries
  72. -spec(put_acl_cache(emqx_types:pubsub(), emqx_topic:topic(), acl_result()) -> ok).
  73. put_acl_cache(PubSub, Topic, AclResult) ->
  74. MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
  75. Size = get_cache_size(),
  76. if
  77. Size < MaxSize ->
  78. add_acl(PubSub, Topic, AclResult);
  79. Size =:= MaxSize ->
  80. NewestK = get_newest_key(),
  81. {_AclResult, CachedAt} = erlang:get(NewestK),
  82. if_expired(CachedAt,
  83. fun(true) ->
  84. % all cache expired, cleanup first
  85. empty_acl_cache(),
  86. add_acl(PubSub, Topic, AclResult);
  87. (false) ->
  88. % cache full, perform cache replacement
  89. evict_acl_cache(),
  90. add_acl(PubSub, Topic, AclResult)
  91. end)
  92. end.
  93. %% delete all the acl entries
  94. -spec(empty_acl_cache() -> ok).
  95. empty_acl_cache() ->
  96. map_acl_cache(fun({CacheK, _CacheV}) ->
  97. erlang:erase(CacheK)
  98. end),
  99. set_cache_size(0),
  100. keys_queue_set(queue:new()).
  101. %% delete the oldest acl entry
  102. -spec(evict_acl_cache() -> ok).
  103. evict_acl_cache() ->
  104. OldestK = keys_queue_out(),
  105. erlang:erase(OldestK),
  106. decr_cache_size().
  107. %% cleanup all the expired cache entries
  108. -spec(cleanup_acl_cache() -> ok).
  109. cleanup_acl_cache() ->
  110. keys_queue_set(
  111. cleanup_acl(keys_queue_get())).
  112. get_oldest_key() ->
  113. keys_queue_pick(queue_front()).
  114. get_newest_key() ->
  115. keys_queue_pick(queue_rear()).
  116. get_cache_size() ->
  117. case erlang:get(acl_cache_size) of
  118. undefined -> 0;
  119. Size -> Size
  120. end.
  121. dump_acl_cache() ->
  122. map_acl_cache(fun(Cache) -> Cache end).
  123. map_acl_cache(Fun) ->
  124. [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
  125. orelse SubPub =:= subscribe].
  126. %%--------------------------------------------------------------------
  127. %% Internal functions
  128. %%--------------------------------------------------------------------
  129. add_acl(PubSub, Topic, AclResult) ->
  130. K = cache_k(PubSub, Topic),
  131. V = cache_v(AclResult),
  132. case erlang:get(K) of
  133. undefined -> add_new_acl(K, V);
  134. {_AclResult, _CachedAt} ->
  135. update_acl(K, V)
  136. end.
  137. add_new_acl(K, V) ->
  138. erlang:put(K, V),
  139. keys_queue_in(K),
  140. incr_cache_size().
  141. update_acl(K, V) ->
  142. erlang:put(K, V),
  143. keys_queue_update(K).
  144. cleanup_acl(KeysQ) ->
  145. case queue:out(KeysQ) of
  146. {{value, OldestK}, KeysQ2} ->
  147. {_AclResult, CachedAt} = erlang:get(OldestK),
  148. if_expired(CachedAt,
  149. fun(false) -> KeysQ;
  150. (true) ->
  151. erlang:erase(OldestK),
  152. decr_cache_size(),
  153. cleanup_acl(KeysQ2)
  154. end);
  155. {empty, KeysQ} -> KeysQ
  156. end.
  157. incr_cache_size() ->
  158. erlang:put(acl_cache_size, get_cache_size() + 1), ok.
  159. decr_cache_size() ->
  160. Size = get_cache_size(),
  161. if Size > 1 ->
  162. erlang:put(acl_cache_size, Size-1);
  163. Size =< 1 ->
  164. erlang:put(acl_cache_size, 0)
  165. end, ok.
  166. set_cache_size(N) ->
  167. erlang:put(acl_cache_size, N), ok.
  168. %%% Ordered Keys Q %%%
  169. keys_queue_in(Key) ->
  170. %% delete the key first if exists
  171. KeysQ = keys_queue_get(),
  172. keys_queue_set(queue:in(Key, KeysQ)).
  173. keys_queue_out() ->
  174. case queue:out(keys_queue_get()) of
  175. {{value, OldestK}, Q2} ->
  176. keys_queue_set(Q2), OldestK;
  177. {empty, _Q} ->
  178. undefined
  179. end.
  180. keys_queue_update(Key) ->
  181. NewKeysQ = keys_queue_remove(Key, keys_queue_get()),
  182. keys_queue_set(queue:in(Key, NewKeysQ)).
  183. keys_queue_pick(Pick) ->
  184. KeysQ = keys_queue_get(),
  185. case queue:is_empty(KeysQ) of
  186. true -> undefined;
  187. false -> Pick(KeysQ)
  188. end.
  189. keys_queue_remove(Key, KeysQ) ->
  190. queue:filter(fun
  191. (K) when K =:= Key -> false; (_) -> true
  192. end, KeysQ).
  193. keys_queue_set(KeysQ) ->
  194. erlang:put(acl_keys_q, KeysQ), ok.
  195. keys_queue_get() ->
  196. case erlang:get(acl_keys_q) of
  197. undefined -> queue:new();
  198. KeysQ -> KeysQ
  199. end.
  200. queue_front() -> fun queue:get/1.
  201. queue_rear() -> fun queue:get_r/1.
  202. time_now() -> erlang:system_time(millisecond).
  203. if_expired(CachedAt, Fun) ->
  204. TTL = get_cache_ttl(),
  205. Now = time_now(),
  206. if (CachedAt + TTL) =< Now ->
  207. Fun(true);
  208. true ->
  209. Fun(false)
  210. end.