| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_acl_cache).
- -include("emqx.hrl").
- -export([ list_acl_cache/0
- , get_acl_cache/2
- , put_acl_cache/3
- , cleanup_acl_cache/0
- , empty_acl_cache/0
- , dump_acl_cache/0
- , get_cache_max_size/0
- , get_cache_ttl/0
- , is_enabled/0
- , drain_cache/0
- ]).
- %% export for test
- -export([ cache_k/2
- , cache_v/1
- , get_cache_size/0
- , get_newest_key/0
- , get_oldest_key/0
- ]).
- -type(acl_result() :: allow | deny).
- -type(system_time() :: integer()).
- -type(cache_key() :: {emqx_types:pubsub(), emqx_types:topic()}).
- -type(cache_val() :: {acl_result(), system_time()}).
- -type(acl_cache_entry() :: {cache_key(), cache_val()}).
- %% Wrappers for key and value
- cache_k(PubSub, Topic)-> {PubSub, Topic}.
- cache_v(AclResult)-> {AclResult, time_now()}.
- drain_k() -> {?MODULE, drain_timestamp}.
- -spec(is_enabled() -> boolean()).
- is_enabled() ->
- application:get_env(emqx, enable_acl_cache, true).
- -spec(get_cache_max_size() -> integer()).
- get_cache_max_size() ->
- application:get_env(emqx, acl_cache_max_size, 32).
- -spec(get_cache_ttl() -> integer()).
- get_cache_ttl() ->
- application:get_env(emqx, acl_cache_ttl, 60000).
- -spec(list_acl_cache() -> [acl_cache_entry()]).
- list_acl_cache() ->
- cleanup_acl_cache(),
- map_acl_cache(fun(Cache) -> Cache end).
- %% We'll cleanup the cache before replacing an expired acl.
- -spec(get_acl_cache(emqx_types:pubsub(), emqx_topic:topic()) -> (acl_result() | not_found)).
- get_acl_cache(PubSub, Topic) ->
- case erlang:get(cache_k(PubSub, Topic)) of
- undefined -> not_found;
- {AclResult, CachedAt} ->
- if_expired(CachedAt,
- fun(false) ->
- AclResult;
- (true) ->
- cleanup_acl_cache(),
- not_found
- end)
- end.
- %% If the cache get full, and also the latest one
- %% is expired, then delete all the cache entries
- -spec(put_acl_cache(emqx_types:pubsub(), emqx_topic:topic(), acl_result()) -> ok).
- put_acl_cache(PubSub, Topic, AclResult) ->
- MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
- Size = get_cache_size(),
- case Size < MaxSize of
- true ->
- add_acl(PubSub, Topic, AclResult);
- false ->
- NewestK = get_newest_key(),
- {_AclResult, CachedAt} = erlang:get(NewestK),
- if_expired(CachedAt,
- fun(true) ->
- % all cache expired, cleanup first
- empty_acl_cache(),
- add_acl(PubSub, Topic, AclResult);
- (false) ->
- % cache full, perform cache replacement
- evict_acl_cache(),
- add_acl(PubSub, Topic, AclResult)
- end)
- end.
- %% delete all the acl entries
- -spec(empty_acl_cache() -> ok).
- empty_acl_cache() ->
- foreach_acl_cache(fun({CacheK, _CacheV}) -> erlang:erase(CacheK) end),
- set_cache_size(0),
- keys_queue_set(queue:new()).
- %% delete the oldest acl entry
- -spec(evict_acl_cache() -> ok).
- evict_acl_cache() ->
- OldestK = keys_queue_out(),
- erlang:erase(OldestK),
- decr_cache_size().
- %% cleanup all the expired cache entries
- -spec(cleanup_acl_cache() -> ok).
- cleanup_acl_cache() ->
- keys_queue_set(
- cleanup_acl(keys_queue_get())).
- get_oldest_key() ->
- keys_queue_pick(queue_front()).
- get_newest_key() ->
- keys_queue_pick(queue_rear()).
- get_cache_size() ->
- case erlang:get(acl_cache_size) of
- undefined -> 0;
- Size -> Size
- end.
- dump_acl_cache() ->
- map_acl_cache(fun(Cache) -> Cache end).
- map_acl_cache(Fun) ->
- [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
- orelse SubPub =:= subscribe].
- foreach_acl_cache(Fun) ->
- _ = map_acl_cache(Fun),
- ok.
- %% All acl cache entries added before `drain_cache()` invocation will become expired
- drain_cache() ->
- _ = persistent_term:put(drain_k(), time_now()),
- ok.
- %%--------------------------------------------------------------------
- %% Internal functions
- %%--------------------------------------------------------------------
- add_acl(PubSub, Topic, AclResult) ->
- K = cache_k(PubSub, Topic),
- V = cache_v(AclResult),
- case erlang:get(K) of
- undefined -> add_new_acl(K, V);
- {_AclResult, _CachedAt} ->
- update_acl(K, V)
- end.
- add_new_acl(K, V) ->
- erlang:put(K, V),
- keys_queue_in(K),
- incr_cache_size().
- update_acl(K, V) ->
- erlang:put(K, V),
- keys_queue_update(K).
- cleanup_acl(KeysQ) ->
- case queue:out(KeysQ) of
- {{value, OldestK}, KeysQ2} ->
- {_AclResult, CachedAt} = erlang:get(OldestK),
- if_expired(CachedAt,
- fun(false) -> KeysQ;
- (true) ->
- erlang:erase(OldestK),
- decr_cache_size(),
- cleanup_acl(KeysQ2)
- end);
- {empty, KeysQ} -> KeysQ
- end.
- incr_cache_size() ->
- erlang:put(acl_cache_size, get_cache_size() + 1), ok.
- decr_cache_size() ->
- Size = get_cache_size(),
- case Size > 1 of
- true ->
- erlang:put(acl_cache_size, Size-1);
- false ->
- erlang:put(acl_cache_size, 0)
- end,
- ok.
- set_cache_size(N) ->
- erlang:put(acl_cache_size, N), ok.
- %%% Ordered Keys Q %%%
- keys_queue_in(Key) ->
- %% delete the key first if exists
- KeysQ = keys_queue_get(),
- keys_queue_set(queue:in(Key, KeysQ)).
- keys_queue_out() ->
- case queue:out(keys_queue_get()) of
- {{value, OldestK}, Q2} ->
- keys_queue_set(Q2), OldestK;
- {empty, _Q} ->
- undefined
- end.
- keys_queue_update(Key) ->
- NewKeysQ = keys_queue_remove(Key, keys_queue_get()),
- keys_queue_set(queue:in(Key, NewKeysQ)).
- keys_queue_pick(Pick) ->
- KeysQ = keys_queue_get(),
- case queue:is_empty(KeysQ) of
- true -> undefined;
- false -> Pick(KeysQ)
- end.
- keys_queue_remove(Key, KeysQ) ->
- queue:filter(fun
- (K) when K =:= Key -> false; (_) -> true
- end, KeysQ).
- keys_queue_set(KeysQ) ->
- erlang:put(acl_keys_q, KeysQ), ok.
- keys_queue_get() ->
- case erlang:get(acl_keys_q) of
- undefined -> queue:new();
- KeysQ -> KeysQ
- end.
- queue_front() -> fun queue:get/1.
- queue_rear() -> fun queue:get_r/1.
- time_now() -> erlang:system_time(millisecond).
- if_expired(CachedAt, Fun) ->
- TTL = get_cache_ttl(),
- Now = time_now(),
- CurrentEvictTimestamp = persistent_term:get(drain_k(), 0),
- case CachedAt =< CurrentEvictTimestamp orelse (CachedAt + TTL) =< Now of
- true -> Fun(true);
- false -> Fun(false)
- end.
|