Explorar o código

feat: support authz cache exclusion config

now one can configure a list of topic-filters to avoid
caching ACL check results

for example

authorization.cache.excludes = ["nocache/#"]

this means ACL check results for topics having 'nocache/' prefix
will not be cached
Zaiming (Stone) Shi %!s(int64=2) %!d(string=hai) anos
pai
achega
9e8a67fd68

+ 1 - 1
Makefile

@@ -317,7 +317,7 @@ $(foreach tt,$(ALL_ELIXIR_TGZS),$(eval $(call gen-elixir-tgz-target,$(tt))))
 fmt: $(REBAR)
 	@$(SCRIPTS)/erlfmt -w 'apps/*/{src,include,priv,test,integration_test}/**/*.{erl,hrl,app.src,eterm}'
 	@$(SCRIPTS)/erlfmt -w '**/*.escript' --exclude-files '_build/**'
-	@$(SCRIPTS)/erlfmt -w '**/rebar.config'
+	@$(SCRIPTS)/erlfmt -w '**/rebar.config' --exclude-files '_build/**'
 	@$(SCRIPTS)/erlfmt -w 'rebar.config.erl'
 	@$(SCRIPTS)/erlfmt -w 'bin/nodetool'
 	@mix format

+ 1 - 1
apps/emqx/src/emqx_access_control.erl

@@ -102,7 +102,7 @@ authorize(ClientInfo, Action, <<"$delayed/", Data/binary>> = RawTopic) ->
     end;
 authorize(ClientInfo, Action, Topic) ->
     Result =
-        case emqx_authz_cache:is_enabled() of
+        case emqx_authz_cache:is_enabled(Topic) of
             true -> check_authorization_cache(ClientInfo, Action, Topic);
             false -> do_authorize(ClientInfo, Action, Topic)
         end,

+ 20 - 9
apps/emqx/src/emqx_authz_cache.erl

@@ -24,10 +24,9 @@
     put_authz_cache/3,
     cleanup_authz_cache/0,
     empty_authz_cache/0,
-    dump_authz_cache/0,
     get_cache_max_size/0,
     get_cache_ttl/0,
-    is_enabled/0,
+    is_enabled/1,
     drain_cache/0,
     drain_cache/1
 ]).
@@ -53,9 +52,20 @@ cache_k(PubSub, Topic) -> {PubSub, Topic}.
 cache_v(AuthzResult) -> {AuthzResult, time_now()}.
 drain_k() -> {?MODULE, drain_timestamp}.
 
--spec is_enabled() -> boolean().
-is_enabled() ->
-    emqx:get_config([authorization, cache, enable], false).
+%% @doc Check if the authz cache is enabled for the given topic.
+-spec is_enabled(emqx_types:topic()) -> boolean().
+is_enabled(Topic) ->
+    case emqx:get_config([authorization, cache]) of
+        #{enable := true, excludes := Filters} ->
+            not is_excluded(Topic, Filters);
+        #{enable := IsEnabled} ->
+            IsEnabled
+    end.
+
+is_excluded(_Topic, []) ->
+    false;
+is_excluded(Topic, [Filter | Filters]) ->
+    emqx_topic:match(Topic, Filter) orelse is_excluded(Topic, Filters).
 
 -spec get_cache_max_size() -> integer().
 get_cache_max_size() ->
@@ -153,14 +163,15 @@ get_cache_size() ->
         Size -> Size
     end.
 
-dump_authz_cache() ->
-    map_authz_cache(fun(Cache) -> Cache end).
-
 map_authz_cache(Fun) ->
+    map_authz_cache(Fun, erlang:get()).
+
+map_authz_cache(Fun, Dict) ->
     [
         Fun(R)
-     || R = {{?authz_action, _T}, _Authz} <- erlang:get()
+     || R = {{?authz_action, _T}, _Authz} <- Dict
     ].
+
 foreach_authz_cache(Fun) ->
     _ = map_authz_cache(Fun),
     ok.

+ 12 - 7
apps/emqx/src/emqx_schema.erl

@@ -439,9 +439,9 @@ fields("stats") ->
     ];
 fields("authorization") ->
     authz_fields();
-fields("authz_cache") ->
+fields(authz_cache) ->
     [
-        {"enable",
+        {enable,
             sc(
                 boolean(),
                 #{
@@ -450,7 +450,7 @@ fields("authz_cache") ->
                     desc => ?DESC(fields_cache_enable)
                 }
             )},
-        {"max_size",
+        {max_size,
             sc(
                 range(1, 1048576),
                 #{
@@ -458,14 +458,19 @@ fields("authz_cache") ->
                     desc => ?DESC(fields_cache_max_size)
                 }
             )},
-        {"ttl",
+        {ttl,
             sc(
                 duration(),
                 #{
                     default => <<"1m">>,
                     desc => ?DESC(fields_cache_ttl)
                 }
-            )}
+            )},
+        {excludes,
+            sc(hoconsc:array(string()), #{
+                default => [],
+                desc => ?DESC(fields_authz_cache_excludes)
+            })}
     ];
 fields("mqtt") ->
     mqtt_general() ++ mqtt_session();
@@ -1994,7 +1999,7 @@ desc("authorization") ->
     "Settings for client authorization.";
 desc("mqtt") ->
     "Global MQTT configuration.";
-desc("authz_cache") ->
+desc(authz_cache) ->
     "Settings for the authorization cache.";
 desc("zone") ->
     "A `Zone` defines a set of configuration items (such as the maximum number of connections)"
@@ -2556,7 +2561,7 @@ authz_fields() ->
             )},
         {"cache",
             sc(
-                ref(?MODULE, "authz_cache"),
+                ref(?MODULE, authz_cache),
                 #{}
             )}
     ].

+ 36 - 23
apps/emqx/test/emqx_authz_cache_SUITE.erl

@@ -20,11 +20,13 @@
 -compile(nowarn_export_all).
 
 -include_lib("eunit/include/eunit.hrl").
+-include_lib("snabbkaffe/include/snabbkaffe.hrl").
 
 all() -> emqx_common_test_helpers:all(?MODULE).
 
 init_per_suite(Config) ->
     Apps = emqx_cth_suite:start([emqx], #{work_dir => emqx_cth_suite:work_dir(Config)}),
+    emqx_config:put([authorization, cache, excludes], [<<"nocache/#">>]),
     [{apps, Apps} | Config].
 
 end_per_suite(Config) ->
@@ -34,24 +36,27 @@ end_per_suite(Config) ->
 %% Test cases
 %%--------------------------------------------------------------------
 
+t_cache_exclude(_) ->
+    ClientId = <<"test-id1">>,
+    {ok, Client} = emqtt:start_link([{clientid, ClientId}]),
+    {ok, _} = emqtt:connect(Client),
+    {ok, _, _} = emqtt:subscribe(Client, <<"nocache/+/#">>, 0),
+    emqtt:publish(Client, <<"nocache/1">>, <<"{\"x\":1}">>, 0),
+    Caches = list_cache(ClientId),
+    ?assertEqual([], Caches),
+    emqtt:stop(Client).
+
 t_clean_authz_cache(_) ->
     {ok, Client} = emqtt:start_link([{clientid, <<"emqx_c">>}]),
     {ok, _} = emqtt:connect(Client),
     {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
     emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
-    ct:sleep(100),
-    ClientPid =
-        case emqx_cm:lookup_channels(<<"emqx_c">>) of
-            Pids when is_list(Pids) ->
-                lists:last(Pids);
-            _ ->
-                {error, not_found}
-        end,
-    Caches = gen_server:call(ClientPid, list_authz_cache),
+    ClientPid = find_client_pid(<<"emqx_c">>),
+    Caches = list_cache(ClientPid),
     ct:log("authz caches: ~p", [Caches]),
     ?assert(length(Caches) > 0),
     erlang:send(ClientPid, clean_authz_cache),
-    ?assertEqual(0, length(gen_server:call(ClientPid, list_authz_cache))),
+    ?assertEqual([], list_cache(ClientPid)),
     emqtt:stop(Client).
 
 t_drain_authz_cache(_) ->
@@ -59,22 +64,30 @@ t_drain_authz_cache(_) ->
     {ok, _} = emqtt:connect(Client),
     {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
     emqtt:publish(Client, <<"t1">>, <<"{\"x\":1}">>, 0),
-    ct:sleep(100),
-    ClientPid =
-        case emqx_cm:lookup_channels(<<"emqx_c">>) of
-            [Pid] when is_pid(Pid) ->
-                Pid;
-            Pids when is_list(Pids) ->
-                lists:last(Pids);
-            _ ->
-                {error, not_found}
-        end,
-    Caches = gen_server:call(ClientPid, list_authz_cache),
+    ClientPid = find_client_pid(<<"emqx_c">>),
+    Caches = list_cache(ClientPid),
     ct:log("authz caches: ~p", [Caches]),
     ?assert(length(Caches) > 0),
     emqx_authz_cache:drain_cache(),
-    ?assertEqual(0, length(gen_server:call(ClientPid, list_authz_cache))),
+    ?assertEqual([], list_cache(ClientPid)),
     ct:sleep(100),
     {ok, _, _} = emqtt:subscribe(Client, <<"t2">>, 0),
-    ?assert(length(gen_server:call(ClientPid, list_authz_cache)) > 0),
+    ?assert(length(list_cache(ClientPid)) > 0),
     emqtt:stop(Client).
+
+list_cache(ClientId) when is_binary(ClientId) ->
+    ClientPid = find_client_pid(ClientId),
+    list_cache(ClientPid);
+list_cache(ClientPid) ->
+    gen_server:call(ClientPid, list_authz_cache).
+
+find_client_pid(ClientId) ->
+    ?retry(_Inteval = 100, _Attempts = 10, do_find_client_pid(ClientId)).
+
+do_find_client_pid(ClientId) ->
+    case emqx_cm:lookup_channels(ClientId) of
+        Pids when is_list(Pids) ->
+            lists:last(Pids);
+        _ ->
+            throw({not_found, ClientId})
+    end.

+ 5 - 2
apps/emqx_auth/test/emqx_authz/emqx_authz_api_settings_SUITE.erl

@@ -70,7 +70,8 @@ t_api(_) ->
         <<"cache">> => #{
             <<"enable">> => false,
             <<"max_size">> => 32,
-            <<"ttl">> => <<"60s">>
+            <<"ttl">> => <<"60s">>,
+            <<"excludes">> => [<<"nocache/#">>]
         }
     },
 
@@ -90,7 +91,9 @@ t_api(_) ->
 
     {ok, 200, Result2} = request(put, uri(["authorization", "settings"]), Settings2),
     {ok, 200, Result2} = request(get, uri(["authorization", "settings"]), []),
-    ?assertEqual(Settings2, emqx_utils_json:decode(Result2)),
+    Cache = maps:get(<<"cache">>, Settings2),
+    ExpectedSettings2 = Settings2#{<<"cache">> => Cache#{<<"excludes">> => []}},
+    ?assertEqual(ExpectedSettings2, emqx_utils_json:decode(Result2)),
 
     ok.
 

+ 3 - 0
changes/ce/feat-12289.en.md

@@ -0,0 +1,3 @@
+Add new config `authorization.cache.excludes` to support ACL cache exclusion.
+
+When configured with a list of topic-filters, the publish or subscribe permission check results for a matching topic or topic filter will not be cached.

+ 6 - 0
rel/i18n/emqx_schema.hocon

@@ -920,6 +920,12 @@ fields_cache_ttl.desc:
 fields_cache_ttl.label:
 """Time to live for the cached data."""
 
+fields_authz_cache_excludes.label:
+"""Excludes"""
+
+fields_authz_cache_excludes.desc:
+"""Exclude caching ACL check results for topics matching the given patterns."""
+
 sys_topics.desc:
 """System topics configuration."""