| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_acl_cache).
- -include("emqx.hrl").
- -export([ get_acl_cache/2
- , put_acl_cache/3
- , cleanup_acl_cache/0
- , empty_acl_cache/0
- , dump_acl_cache/0
- , get_cache_size/0
- , get_cache_max_size/0
- , get_newest_key/0
- , get_oldest_key/0
- , cache_k/2
- , cache_v/1
- , is_enabled/0
- ]).
- -type(acl_result() :: allow | deny).
- %% Wrappers for key and value
- cache_k(PubSub, Topic)-> {PubSub, Topic}.
- cache_v(AclResult)-> {AclResult, time_now()}.
- -spec(is_enabled() -> boolean()).
- is_enabled() ->
- application:get_env(emqx, enable_acl_cache, true).
- %% We'll cleanup the cache before repalcing an expired acl.
- -spec(get_acl_cache(publish | subscribe, emqx_topic:topic()) -> (acl_result() | not_found)).
- get_acl_cache(PubSub, Topic) ->
- case erlang:get(cache_k(PubSub, Topic)) of
- undefined -> not_found;
- {AclResult, CachedAt} ->
- if_expired(CachedAt,
- fun(false) ->
- AclResult;
- (true) ->
- cleanup_acl_cache(),
- not_found
- end)
- end.
- %% If the cache get full, and also the latest one
- %% is expired, then delete all the cache entries
- -spec(put_acl_cache(publish | subscribe, emqx_topic:topic(), acl_result()) -> ok).
- put_acl_cache(PubSub, Topic, AclResult) ->
- MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
- Size = get_cache_size(),
- if
- Size < MaxSize ->
- add_acl(PubSub, Topic, AclResult);
- Size =:= MaxSize ->
- NewestK = get_newest_key(),
- {_AclResult, CachedAt} = erlang:get(NewestK),
- if_expired(CachedAt,
- fun(true) ->
- % all cache expired, cleanup first
- empty_acl_cache(),
- add_acl(PubSub, Topic, AclResult);
- (false) ->
- % cache full, perform cache replacement
- evict_acl_cache(),
- add_acl(PubSub, Topic, AclResult)
- end)
- end.
- %% delete all the acl entries
- -spec(empty_acl_cache() -> ok).
- empty_acl_cache() ->
- map_acl_cache(fun({CacheK, _CacheV}) ->
- erlang:erase(CacheK)
- end),
- set_cache_size(0),
- keys_queue_set(queue:new()).
- %% delete the oldest acl entry
- -spec(evict_acl_cache() -> ok).
- evict_acl_cache() ->
- OldestK = keys_queue_out(),
- erlang:erase(OldestK),
- decr_cache_size().
- %% cleanup all the exipired cache entries
- -spec(cleanup_acl_cache() -> ok).
- cleanup_acl_cache() ->
- keys_queue_set(
- cleanup_acl(keys_queue_get())).
- get_oldest_key() ->
- keys_queue_pick(queue_front()).
- get_newest_key() ->
- keys_queue_pick(queue_rear()).
- get_cache_max_size() ->
- application:get_env(emqx, acl_cache_max_size, 32).
- get_cache_size() ->
- case erlang:get(acl_cache_size) of
- undefined -> 0;
- Size -> Size
- end.
- dump_acl_cache() ->
- map_acl_cache(fun(Cache) -> Cache end).
- map_acl_cache(Fun) ->
- [Fun(R) || R = {{SubPub, _T}, _Acl} <- get(), SubPub =:= publish
- orelse SubPub =:= subscribe].
- %%--------------------------------------------------------------------
- %% Internal functions
- %%--------------------------------------------------------------------
- add_acl(PubSub, Topic, AclResult) ->
- K = cache_k(PubSub, Topic),
- V = cache_v(AclResult),
- case erlang:get(K) of
- undefined -> add_new_acl(K, V);
- {_AclResult, _CachedAt} ->
- update_acl(K, V)
- end.
- add_new_acl(K, V) ->
- erlang:put(K, V),
- keys_queue_in(K),
- incr_cache_size().
- update_acl(K, V) ->
- erlang:put(K, V),
- keys_queue_update(K).
- cleanup_acl(KeysQ) ->
- case queue:out(KeysQ) of
- {{value, OldestK}, KeysQ2} ->
- {_AclResult, CachedAt} = erlang:get(OldestK),
- if_expired(CachedAt,
- fun(false) -> KeysQ;
- (true) ->
- erlang:erase(OldestK),
- decr_cache_size(),
- cleanup_acl(KeysQ2)
- end);
- {empty, KeysQ} -> KeysQ
- end.
- incr_cache_size() ->
- erlang:put(acl_cache_size, get_cache_size() + 1), ok.
- decr_cache_size() ->
- Size = get_cache_size(),
- if Size > 1 ->
- erlang:put(acl_cache_size, Size-1);
- Size =< 1 ->
- erlang:put(acl_cache_size, 0)
- end, ok.
- set_cache_size(N) ->
- erlang:put(acl_cache_size, N), ok.
- %%% Ordered Keys Q %%%
- keys_queue_in(Key) ->
- %% delete the key first if exists
- KeysQ = keys_queue_get(),
- keys_queue_set(queue:in(Key, KeysQ)).
- keys_queue_out() ->
- case queue:out(keys_queue_get()) of
- {{value, OldestK}, Q2} ->
- keys_queue_set(Q2), OldestK;
- {empty, _Q} ->
- undefined
- end.
- keys_queue_update(Key) ->
- NewKeysQ = keys_queue_remove(Key, keys_queue_get()),
- keys_queue_set(queue:in(Key, NewKeysQ)).
- keys_queue_pick(Pick) ->
- KeysQ = keys_queue_get(),
- case queue:is_empty(KeysQ) of
- true -> undefined;
- false -> Pick(KeysQ)
- end.
- keys_queue_remove(Key, KeysQ) ->
- queue:filter(fun
- (K) when K =:= Key -> false; (_) -> true
- end, KeysQ).
- keys_queue_set(KeysQ) ->
- erlang:put(acl_keys_q, KeysQ), ok.
- keys_queue_get() ->
- case erlang:get(acl_keys_q) of
- undefined -> queue:new();
- KeysQ -> KeysQ
- end.
- queue_front() -> fun queue:get/1.
- queue_rear() -> fun queue:get_r/1.
- time_now() -> erlang:system_time(millisecond).
- if_expired(CachedAt, Fun) ->
- TTL = application:get_env(emqx, acl_cache_ttl, 60000),
- Now = time_now(),
- if (CachedAt + TTL) =< Now ->
- Fun(true);
- true ->
- Fun(false)
- end.
|