Quellcode durchsuchen

feat(acl): Add possibility to remove all acl cache

Karol Kaczmarek vor 4 Jahren
Ursprung
Commit
c7b44caa1d
2 geänderte Dateien mit 44 neuen und 10 gelöschten Zeilen
  1. 20 10
      src/emqx_acl_cache.erl
  2. 24 0
      test/emqx_acl_cache_SUITE.erl

+ 20 - 10
src/emqx_acl_cache.erl

@@ -27,6 +27,7 @@
         , get_cache_max_size/0
         , get_cache_ttl/0
         , is_enabled/0
+        , drain_cache/0
         ]).
 
 %% export for test
@@ -47,6 +48,7 @@
 %% Wrappers for key and value
 cache_k(PubSub, Topic)-> {PubSub, Topic}.
 cache_v(AclResult)-> {AclResult, time_now()}.
+drain_k() -> {?MODULE, drain_timestamp}.
 
 -spec(is_enabled() -> boolean()).
 is_enabled() ->
@@ -86,10 +88,10 @@ get_acl_cache(PubSub, Topic) ->
 put_acl_cache(PubSub, Topic, AclResult) ->
     MaxSize = get_cache_max_size(), true = (MaxSize =/= 0),
     Size = get_cache_size(),
-    if
-        Size < MaxSize ->
+    case Size < MaxSize of
+        true ->
             add_acl(PubSub, Topic, AclResult);
-        Size =:= MaxSize ->
+        false ->
             NewestK = get_newest_key(),
             {_AclResult, CachedAt} = erlang:get(NewestK),
             if_expired(CachedAt,
@@ -145,6 +147,11 @@ foreach_acl_cache(Fun) ->
     _ = map_acl_cache(Fun),
     ok.
 
+%% All acl cache entries added before `drain_cache()` invocation will become expired
+drain_cache() ->
+    _ = persistent_term:put(drain_k(), time_now()),
+    ok.
+
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------
@@ -185,11 +192,14 @@ incr_cache_size() ->
     erlang:put(acl_cache_size, get_cache_size() + 1), ok.
 decr_cache_size() ->
     Size = get_cache_size(),
-    if Size > 1 ->
+    case Size > 1 of
+        true ->
           erlang:put(acl_cache_size, Size-1);
-       Size =< 1 ->
+        false ->
           erlang:put(acl_cache_size, 0)
-    end, ok.
+    end,
+    ok.
+
 set_cache_size(N) ->
     erlang:put(acl_cache_size, N), ok.
 
@@ -239,8 +249,8 @@ time_now() -> erlang:system_time(millisecond).
 if_expired(CachedAt, Fun) ->
     TTL = get_cache_ttl(),
     Now = time_now(),
-    if (CachedAt + TTL) =< Now ->
-           Fun(true);
-       true ->
-           Fun(false)
+    CurrentEvictTimestamp = persistent_term:get(drain_k(), 0),
+    case CachedAt =< CurrentEvictTimestamp orelse (CachedAt + TTL) =< Now of
+        true -> Fun(true);
+        false -> Fun(false)
     end.

+ 24 - 0
test/emqx_acl_cache_SUITE.erl

@@ -55,6 +55,30 @@ t_clean_acl_cache(_) ->
     ?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))),
     emqtt:stop(Client).
 
+
+t_drain_acl_cache(_) ->
+    {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
+    emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
+    ct:sleep(100),
+    ClientPid = case emqx_cm:lookup_channels(<<"emqx_c">>) of
+                    [Pid] when is_pid(Pid) ->
+                        Pid;
+                    Pids when is_list(Pids) ->
+                        lists:last(Pids);
+                    _ -> {error, not_found}
+                end,
+    Caches = gen_server:call(ClientPid, list_acl_cache),
+    ct:log("acl caches: ~p", [Caches]),
+    ?assert(length(Caches) > 0),
+    emqx_acl_cache:drain_cache(),
+    ?assertEqual(0, length(gen_server:call(ClientPid, list_acl_cache))),
+    ct:sleep(100),
+    {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
+    ?assert(length(gen_server:call(ClientPid, list_acl_cache)) > 0),
+    emqtt:stop(Client).
+
 % optimize??
 t_reload_aclfile_and_cleanall(_Config) ->