Przeglądaj źródła

fix(authz): correctly identify qos of subscribe actions

Ilya Averyanov 2 lat temu
rodzic
commit
deaac9bd73

+ 22 - 0
apps/emqx/include/asserts.hrl

@@ -60,6 +60,28 @@
     end)()
 ).
 
+-define(assertNotReceive(PATTERN),
+    ?assertNotReceive(PATTERN, 300)
+).
+
+-define(assertNotReceive(PATTERN, TIMEOUT),
+    (fun() ->
+        receive
+            X__V = PATTERN ->
+                erlang:error(
+                    {assertNotReceive, [
+                        {module, ?MODULE},
+                        {line, ?LINE},
+                        {expression, (??PATTERN)},
+                        {message, X__V}
+                    ]}
+                )
+        after TIMEOUT ->
+            ok
+        end
+    end)()
+).
+
 -define(retrying(CONFIG, NUM_RETRIES, TEST_BODY_FN), begin
     __TEST_CASE = ?FUNCTION_NAME,
     (fun

+ 8 - 11
apps/emqx/src/emqx_channel.erl

@@ -492,7 +492,7 @@ handle_in(
         ok ->
             TopicFilters0 = parse_topic_filters(TopicFilters),
             TopicFilters1 = enrich_subopts_subid(Properties, TopicFilters0),
-            TupleTopicFilters0 = check_sub_authzs(SubPkt, TopicFilters1, Channel),
+            TupleTopicFilters0 = check_sub_authzs(TopicFilters1, Channel),
             HasAuthzDeny = lists:any(
                 fun({_TopicFilter, ReasonCode}) ->
                     ReasonCode =:= ?RC_NOT_AUTHORIZED
@@ -1846,9 +1846,7 @@ authz_action(#mqtt_packet{
     header = #mqtt_packet_header{qos = QoS, retain = Retain}, variable = #mqtt_packet_publish{}
 }) ->
     ?AUTHZ_PUBLISH(QoS, Retain);
-authz_action(#mqtt_packet{
-    header = #mqtt_packet_header{qos = QoS}, variable = #mqtt_packet_subscribe{}
-}) ->
+authz_action({_Topic, #{qos := QoS} = _SubOpts} = _TopicFilter) ->
     ?AUTHZ_SUBSCRIBE(QoS);
 %% Will message
 authz_action(#message{qos = QoS, flags = #{retain := Retain}}) ->
@@ -1889,23 +1887,22 @@ check_pub_caps(
 %%--------------------------------------------------------------------
 %% Check Sub Authorization
 
-check_sub_authzs(Packet, TopicFilters, Channel) ->
-    Action = authz_action(Packet),
-    check_sub_authzs(Action, TopicFilters, Channel, []).
+check_sub_authzs(TopicFilters, Channel) ->
+    check_sub_authzs(TopicFilters, Channel, []).
 
 check_sub_authzs(
-    Action,
     [TopicFilter = {Topic, _} | More],
     Channel = #channel{clientinfo = ClientInfo},
     Acc
 ) ->
+    Action = authz_action(TopicFilter),
     case emqx_access_control:authorize(ClientInfo, Action, Topic) of
         allow ->
-            check_sub_authzs(Action, More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
+            check_sub_authzs(More, Channel, [{TopicFilter, ?RC_SUCCESS} | Acc]);
         deny ->
-            check_sub_authzs(Action, More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
+            check_sub_authzs(More, Channel, [{TopicFilter, ?RC_NOT_AUTHORIZED} | Acc])
     end;
-check_sub_authzs(_Action, [], _Channel, Acc) ->
+check_sub_authzs([], _Channel, Acc) ->
     lists:reverse(Acc).
 
 %%--------------------------------------------------------------------

+ 1 - 2
apps/emqx/test/emqx_channel_SUITE.erl

@@ -908,8 +908,7 @@ t_check_pub_alias(_) ->
 t_check_sub_authzs(_) ->
     emqx_config:put_zone_conf(default, [authorization, enable], true),
     TopicFilter = {<<"t">>, ?DEFAULT_SUBOPTS},
-    Subscribe = ?SUBSCRIBE_PACKET(1, [TopicFilter]),
-    [{TopicFilter, 0}] = emqx_channel:check_sub_authzs(Subscribe, [TopicFilter], channel()).
+    [{TopicFilter, 0}] = emqx_channel:check_sub_authzs([TopicFilter], channel()).
 
 t_enrich_connack_caps(_) ->
     ok = meck:new(emqx_mqtt_caps, [passthrough, no_history]),

+ 138 - 0
apps/emqx_authz/test/emqx_authz_rich_actions_SUITE.erl

@@ -0,0 +1,138 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2023 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% @doc Test suite verifies that MQTT retain and qos parameters
+%% correctly reach the authorization.
+%%--------------------------------------------------------------------
+
+-module(emqx_authz_rich_actions_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+-include_lib("emqx/include/asserts.hrl").
+
+all() ->
+    emqx_common_test_helpers:all(?MODULE).
+
+groups() ->
+    [].
+
+init_per_testcase(TestCase, Config) ->
+    Apps = emqx_cth_suite:start(
+        [
+            {emqx_conf, "authorization.no_match = deny, authorization.cache.enable = false"},
+            emqx_authz
+        ],
+        #{work_dir => filename:join(?config(priv_dir, Config), TestCase)}
+    ),
+    [{tc_apps, Apps} | Config].
+
+end_per_testcase(_TestCase, Config) ->
+    emqx_cth_suite:stop(?config(tc_apps, Config)),
+    _ = emqx_authz:set_feature_available(rich_actions, true).
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_rich_actions_subscribe(_Config) ->
+    ok = setup_config(#{
+        <<"type">> => <<"file">>,
+        <<"enable">> => true,
+        <<"rules">> =>
+            <<
+                "{allow, {user, \"username\"}, {subscribe, [{qos, 1}]}, [\"t1\"]}."
+                "\n{allow, {user, \"username\"}, {subscribe, [{qos, 2}]}, [\"t2\"]}."
+            >>
+    }),
+
+    {ok, C} = emqtt:start_link([{username, <<"username">>}]),
+    {ok, _} = emqtt:connect(C),
+
+    ?assertMatch(
+        {ok, _, [1]},
+        emqtt:subscribe(C, <<"t1">>, 1)
+    ),
+
+    ?assertMatch(
+        {ok, _, [1, 2]},
+        emqtt:subscribe(C, #{}, [{<<"t1">>, [{qos, 1}]}, {<<"t2">>, [{qos, 2}]}])
+    ),
+
+    ?assertMatch(
+        {ok, _, [128, 128]},
+        emqtt:subscribe(C, #{}, [{<<"t1">>, [{qos, 2}]}, {<<"t2">>, [{qos, 1}]}])
+    ),
+
+    ok = emqtt:stop(C).
+
+t_rich_actions_publish(_Config) ->
+    ok = setup_config(#{
+        <<"type">> => <<"file">>,
+        <<"enable">> => true,
+        <<"rules">> =>
+            <<
+                "{allow, {user, \"publisher\"}, {publish, [{qos, 0}]}, [\"t0\"]}."
+                "\n{allow, {user, \"publisher\"}, {publish, [{qos, 1}, {retain, true}]}, [\"t1\"]}."
+                "\n{allow, {user, \"subscriber\"}, subscribe, [\"#\"]}."
+            >>
+    }),
+
+    {ok, PC} = emqtt:start_link([{username, <<"publisher">>}]),
+    {ok, _} = emqtt:connect(PC),
+
+    {ok, SC} = emqtt:start_link([{username, <<"subscriber">>}]),
+    {ok, _} = emqtt:connect(SC),
+    {ok, _, _} = emqtt:subscribe(SC, <<"#">>, 1),
+
+    _ = emqtt:publish(PC, <<"t0">>, <<"qos0">>, [{qos, 0}]),
+    _ = emqtt:publish(PC, <<"t1">>, <<"qos1-retain">>, [{qos, 1}, {retain, true}]),
+
+    _ = emqtt:publish(PC, <<"t0">>, <<"qos1">>, [{qos, 1}]),
+    _ = emqtt:publish(PC, <<"t1">>, <<"qos1-noretain">>, [{qos, 1}, {retain, false}]),
+
+    ?assertReceive(
+        {publish, #{topic := <<"t0">>, payload := <<"qos0">>}}
+    ),
+
+    ?assertReceive(
+        {publish, #{topic := <<"t1">>, payload := <<"qos1-retain">>}}
+    ),
+
+    ?assertNotReceive(
+        {publish, #{topic := <<"t0">>, payload := <<"qos1">>}}
+    ),
+
+    ?assertNotReceive(
+        {publish, #{topic := <<"t1">>, payload := <<"qos1-noretain">>}}
+    ),
+
+    ok = emqtt:stop(PC),
+    ok = emqtt:stop(SC).
+
+%%------------------------------------------------------------------------------
+%% Helpers
+%%------------------------------------------------------------------------------
+
+setup_config(Params) ->
+    emqx_authz_test_lib:setup_config(
+        Params,
+        #{}
+    ).
+
+stop_apps(Apps) ->
+    lists:foreach(fun application:stop/1, Apps).

+ 1 - 1
apps/emqx_management/test/emqx_mgmt_api_clients_SUITE.erl

@@ -172,7 +172,7 @@ t_authz_cache(_) ->
 
     {ok, C} = emqtt:start_link(#{clientid => ClientId}),
     {ok, _} = emqtt:connect(C),
-    {ok, _, _} = emqtt:subscribe(C, <<"topic/1">>, 0),
+    {ok, _, _} = emqtt:subscribe(C, <<"topic/1">>, 1),
 
     ClientAuthzCachePath = emqx_mgmt_api_test_util:api_path([
         "clients",