emqx_acl_cache.erl 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. %%--------------------------------------------------------------------
  2. %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
  3. %%
  4. %% Licensed under the Apache License, Version 2.0 (the "License");
  5. %% you may not use this file except in compliance with the License.
  6. %% You may obtain a copy of the License at
  7. %%
  8. %% http://www.apache.org/licenses/LICENSE-2.0
  9. %%
  10. %% Unless required by applicable law or agreed to in writing, software
  11. %% distributed under the License is distributed on an "AS IS" BASIS,
  12. %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. %% See the License for the specific language governing permissions and
  14. %% limitations under the License.
  15. %%--------------------------------------------------------------------
  16. -module(emqx_acl_cache).
  17. -include("emqx.hrl").
  18. -export([ get_acl_cache/2
  19. , put_acl_cache/3
  20. , cleanup_acl_cache/0
  21. , empty_acl_cache/0
  22. , dump_acl_cache/0
  23. , get_cache_size/0
  24. , get_cache_max_size/0
  25. , get_newest_key/0
  26. , get_oldest_key/0
  27. , cache_k/2
  28. , cache_v/1
  29. , is_enabled/0
  30. ]).
  31. -type(acl_result() :: allow | deny).
  32. %% Wrappers for key and value
  33. cache_k(PubSub, Topic)-> {PubSub, Topic}.
  34. cache_v(AclResult)-> {AclResult, time_now()}.
  35. -spec(is_enabled() -> boolean()).
  36. is_enabled() ->
  37. application:get_env(emqx, enable_acl_cache, true).
  38. %% We'll cleanup the cache before repalcing an expired acl.
  39. -spec(get_acl_cache(publish | subscribe, emqx_topic:topic()) -> (acl_result() | not_found)).
  40. get_acl_cache(PubSub, Topic) ->
  41. case erlang:get(cache_k(PubSub, Topic)) of
  42. undefined -> not_found;
  43. {AclResult, CachedAt} ->
  44. if_expired(CachedAt,
  45. fun(false) ->
  46. AclResult;
  47. (true) ->
  48. cleanup_acl_cache(),
  49. not_found
  50. end)
  51. end.
  52. %% If the cache get full, and also the latest one
  53. %% is expired, then delete all the cache entries
  54. -spec(put_acl_cache(publish | subscribe, emqx_topic:topic(), acl_result()) -> ok).
  55. put_acl_cache(PubSub, Topic, AclResult) ->
  56. MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
  57. Size = get_cache_size(),
  58. if
  59. Size < MaxSize ->
  60. add_acl(PubSub, Topic, AclResult);
  61. Size =:= MaxSize ->
  62. NewestK = get_newest_key(),
  63. {_AclResult, CachedAt} = erlang:get(NewestK),
  64. if_expired(CachedAt,
  65. fun(true) ->
  66. % all cache expired, cleanup first
  67. empty_acl_cache(),
  68. add_acl(PubSub, Topic, AclResult);
  69. (false) ->
  70. % cache full, perform cache replacement
  71. evict_acl_cache(),
  72. add_acl(PubSub, Topic, AclResult)
  73. end)
  74. end.
  75. %% delete all the acl entries
  76. -spec(empty_acl_cache() -> ok).
  77. empty_acl_cache() ->
  78. map_acl_cache(fun({CacheK, _CacheV}) ->
  79. erlang:erase(CacheK)
  80. end),
  81. set_cache_size(0),
  82. keys_queue_set(queue:new()).
  83. %% delete the oldest acl entry
  84. -spec(evict_acl_cache() -> ok).
  85. evict_acl_cache() ->
  86. OldestK = keys_queue_out(),
  87. erlang:erase(OldestK),
  88. decr_cache_size().
  89. %% cleanup all the exipired cache entries
  90. -spec(cleanup_acl_cache() -> ok).
  91. cleanup_acl_cache() ->
  92. keys_queue_set(
  93. cleanup_acl(keys_queue_get())).
  94. get_oldest_key() ->
  95. keys_queue_pick(queue_front()).
  96. get_newest_key() ->
  97. keys_queue_pick(queue_rear()).
  98. get_cache_max_size() ->
  99. application:get_env(emqx, acl_cache_max_size, 32).
  100. get_cache_size() ->
  101. case erlang:get(acl_cache_size) of
  102. undefined -> 0;
  103. Size -> Size
  104. end.
  105. dump_acl_cache() ->
  106. map_acl_cache(fun(Cache) -> Cache end).
  107. map_acl_cache(Fun) ->
  108. [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
  109. orelse SubPub =:= subscribe].
  110. %%--------------------------------------------------------------------
  111. %% Internal functions
  112. %%--------------------------------------------------------------------
  113. add_acl(PubSub, Topic, AclResult) ->
  114. K = cache_k(PubSub, Topic),
  115. V = cache_v(AclResult),
  116. case erlang:get(K) of
  117. undefined -> add_new_acl(K, V);
  118. {_AclResult, _CachedAt} ->
  119. update_acl(K, V)
  120. end.
  121. add_new_acl(K, V) ->
  122. erlang:put(K, V),
  123. keys_queue_in(K),
  124. incr_cache_size().
  125. update_acl(K, V) ->
  126. erlang:put(K, V),
  127. keys_queue_update(K).
  128. cleanup_acl(KeysQ) ->
  129. case queue:out(KeysQ) of
  130. {{value, OldestK}, KeysQ2} ->
  131. {_AclResult, CachedAt} = erlang:get(OldestK),
  132. if_expired(CachedAt,
  133. fun(false) -> KeysQ;
  134. (true) ->
  135. erlang:erase(OldestK),
  136. decr_cache_size(),
  137. cleanup_acl(KeysQ2)
  138. end);
  139. {empty, KeysQ} -> KeysQ
  140. end.
  141. incr_cache_size() ->
  142. erlang:put(acl_cache_size, get_cache_size() + 1), ok.
  143. decr_cache_size() ->
  144. Size = get_cache_size(),
  145. if Size > 1 ->
  146. erlang:put(acl_cache_size, Size-1);
  147. Size =< 1 ->
  148. erlang:put(acl_cache_size, 0)
  149. end, ok.
  150. set_cache_size(N) ->
  151. erlang:put(acl_cache_size, N), ok.
  152. %%% Ordered Keys Q %%%
  153. keys_queue_in(Key) ->
  154. %% delete the key first if exists
  155. KeysQ = keys_queue_get(),
  156. keys_queue_set(queue:in(Key, KeysQ)).
  157. keys_queue_out() ->
  158. case queue:out(keys_queue_get()) of
  159. {{value, OldestK}, Q2} ->
  160. keys_queue_set(Q2), OldestK;
  161. {empty, _Q} ->
  162. undefined
  163. end.
  164. keys_queue_update(Key) ->
  165. NewKeysQ = keys_queue_remove(Key, keys_queue_get()),
  166. keys_queue_set(queue:in(Key, NewKeysQ)).
  167. keys_queue_pick(Pick) ->
  168. KeysQ = keys_queue_get(),
  169. case queue:is_empty(KeysQ) of
  170. true -> undefined;
  171. false -> Pick(KeysQ)
  172. end.
  173. keys_queue_remove(Key, KeysQ) ->
  174. queue:filter(fun
  175. (K) when K =:= Key -> false; (_) -> true
  176. end, KeysQ).
  177. keys_queue_set(KeysQ) ->
  178. erlang:put(acl_keys_q, KeysQ), ok.
  179. keys_queue_get() ->
  180. case erlang:get(acl_keys_q) of
  181. undefined -> queue:new();
  182. KeysQ -> KeysQ
  183. end.
  184. queue_front() -> fun queue:get/1.
  185. queue_rear() -> fun queue:get_r/1.
  186. time_now() -> erlang:system_time(millisecond).
  187. if_expired(CachedAt, Fun) ->
  188. TTL = application:get_env(emqx, acl_cache_ttl, 60000),
  189. Now = time_now(),
  190. if (CachedAt + TTL) =< Now ->
  191. Fun(true);
  192. true ->
  193. Fun(false)
  194. end.