Просмотр исходного кода

Add API for clean and get acl cache

terry-xiaoyu 6 лет назад
Родитель
Сommit
c1d768ff74
3 измененных файлов с 61 добавлено и 13 удалено
  1. 33 13
      src/emqx_acl_cache.erl
  2. 8 0
      src/emqx_channel.erl
  3. 20 0
      test/emqx_connection_SUITE.erl

+ 33 - 13
src/emqx_acl_cache.erl

@@ -18,21 +18,31 @@
 
 -include("emqx.hrl").
 
--export([ get_acl_cache/2
+-export([ list_acl_cache/0
+        , get_acl_cache/2
         , put_acl_cache/3
         , cleanup_acl_cache/0
         , empty_acl_cache/0
         , dump_acl_cache/0
-        , get_cache_size/0
         , get_cache_max_size/0
+        , get_cache_ttl/0
+        , is_enabled/0
+        ]).
+
+%% export for test
+-export([ cache_k/2
+        , cache_v/1
+        , get_cache_size/0
         , get_newest_key/0
         , get_oldest_key/0
-        , cache_k/2
-        , cache_v/1
-        , is_enabled/0
         ]).
 
 -type(acl_result() :: allow | deny).
+-type(system_time() :: integer()).
+-type(cache_key() :: {emqx_types:pubsub(), emqx_types:topic()}).
+-type(cache_val() :: {acl_result(), system_time()}).
+
+-type(acl_cache_entry() :: {cache_key(), cache_val()}).
 
 %% Wrappers for key and value
 cache_k(PubSub, Topic)-> {PubSub, Topic}.
@@ -42,8 +52,21 @@ cache_v(AclResult)-> {AclResult, time_now()}.
 is_enabled() ->
     application:get_env(emqx, enable_acl_cache, true).
 
-%% We'll cleanup the cache before repalcing an expired acl.
--spec(get_acl_cache(publish | subscribe, emqx_topic:topic()) -> (acl_result() | not_found)).
+-spec(get_cache_max_size() -> integer()).
+get_cache_max_size() ->
+    application:get_env(emqx, acl_cache_max_size, 32).
+
+-spec(get_cache_ttl() -> integer()).
+get_cache_ttl() ->
+     application:get_env(emqx, acl_cache_ttl, 60000).
+
+-spec(list_acl_cache() -> [acl_cache_entry()]).
+list_acl_cache() ->
+    cleanup_acl_cache(),
+    map_acl_cache(fun(Cache) -> Cache end).
+
+%% We'll cleanup the cache before replacing an expired acl.
+-spec(get_acl_cache(emqx_types:pubsub(), emqx_topic:topic()) -> (acl_result() | not_found)).
 get_acl_cache(PubSub, Topic) ->
     case erlang:get(cache_k(PubSub, Topic)) of
         undefined -> not_found;
@@ -59,7 +82,7 @@ get_acl_cache(PubSub, Topic) ->
 
 %% If the cache get full, and also the latest one
 %%   is expired, then delete all the cache entries
--spec(put_acl_cache(publish | subscribe, emqx_topic:topic(), acl_result()) -> ok).
+-spec(put_acl_cache(emqx_types:pubsub(), emqx_topic:topic(), acl_result()) -> ok).
 put_acl_cache(PubSub, Topic, AclResult) ->
     MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
     Size = get_cache_size(),
@@ -97,7 +120,7 @@ evict_acl_cache() ->
     erlang:erase(OldestK),
     decr_cache_size().
 
-%% cleanup all the exipired cache entries
+%% cleanup all the expired cache entries
 -spec(cleanup_acl_cache() -> ok).
 cleanup_acl_cache() ->
     keys_queue_set(
@@ -108,9 +131,6 @@ get_oldest_key() ->
 get_newest_key() ->
     keys_queue_pick(queue_rear()).
 
-get_cache_max_size() ->
-    application:get_env(emqx, acl_cache_max_size, 32).
-
 get_cache_size() ->
     case erlang:get(acl_cache_size) of
         undefined -> 0;
@@ -215,7 +235,7 @@ queue_rear() -> fun queue:get_r/1.
 time_now() -> erlang:system_time(millisecond).
 
 if_expired(CachedAt, Fun) ->
-    TTL = application:get_env(emqx, acl_cache_ttl, 60000),
+    TTL = get_cache_ttl(),
     Now = time_now(),
     if (CachedAt + TTL) =< Now ->
            Fun(true);

+ 8 - 0
src/emqx_channel.erl

@@ -705,6 +705,9 @@ handle_call({takeover, 'end'}, Channel = #channel{session  = Session,
     AllPendings = lists:append(Delivers, Pendings),
     {stop, {shutdown, takeovered}, AllPendings, Channel};
 
+handle_call(list_acl_cache, Channel) ->
+    {reply, emqx_acl_cache:list_acl_cache(), Channel};
+
 handle_call(Req, Channel) ->
     ?LOG(error, "Unexpected call: ~p", [Req]),
     {reply, ignored, Channel}.
@@ -757,6 +760,11 @@ handle_info({sock_closed, Reason}, Channel = #channel{conninfo = ConnInfo,
             shutdown(Reason, Channel2)
     end;
 
+handle_info(clean_acl_cache, Channel) ->
+    ?LOG(debug, "clear acl cache"),
+    ok = emqx_acl_cache:empty_acl_cache(),
+    {ok, Channel};
+
 handle_info(Info, Channel) ->
     ?LOG(error, "Unexpected info: ~p~n", [Info]),
     error(unexpected_info),

+ 20 - 0
test/emqx_connection_SUITE.erl

@@ -31,6 +31,26 @@ init_per_suite(Config) ->
 end_per_suite(_Config) ->
     emqx_ct_helpers:stop_apps([]).
 
+t_clean_acl_cache(_Config) ->
+    {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
+    emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
+    ct:sleep(100),
+    ClientPid = case emqx_cm:lookup_channels(<<"emqx_c">>) of
+        [Pid] when is_pid(Pid) ->
+            Pid;
+        Pids when is_list(Pids) ->
+            lists:last(Pids);
+        _ -> {error, not_found}
+    end,
+    Caches = gen_server:call(ClientPid, list_acl_cache),
+    ct:log("acl caches: ~p", [Caches]),
+    ?assert(length(Caches) > 0),
+    erlang:send(ClientPid, clean_acl_cache),
+    ?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))),
+    emqtt:stop(Client).
+
 t_basic(_) ->
     Topic = <<"TopicA">>,
     {ok, C} = emqtt:start_link([{port, 1883}, {clientid, <<"hello">>}]),