ソースを参照

Merge pull request #7586 from lafirest/test/gateway_authz

test(gateway): integration gateway test with http authz
JianBo He 3 年 前
コミット
56191d42e7

+ 3 - 0
apps/emqx_gateway/test/emqx_coap_SUITE.erl

@@ -88,6 +88,9 @@ default_config() ->
 mqtt_prefix() ->
     ?MQTT_PREFIX.
 
+ps_prefix() ->
+    ?PS_PREFIX.
+
 %%--------------------------------------------------------------------
 %% Test Cases
 %%--------------------------------------------------------------------

+ 49 - 6
apps/emqx_gateway/test/emqx_gateway_auth_ct.erl

@@ -47,8 +47,12 @@
 
 -define(CALL(Msg), gen_server:call(?MODULE, {?FUNCTION_NAME, Msg})).
 
--define(HTTP_PORT, 37333).
--define(HTTP_PATH, "/auth").
+-define(AUTHN_HTTP_PORT, 37333).
+-define(AUTHN_HTTP_PATH, "/auth").
+
+-define(AUTHZ_HTTP_PORT, 38333).
+-define(AUTHZ_HTTP_PATH, "/authz/[...]").
+
 -define(GATEWAYS, [coap, lwm2m, mqttsn, stomp, exproto]).
 
 -define(CONFS, [
@@ -123,14 +127,14 @@ format_status(_Opt, Status) ->
 
 on_start_auth(authn_http) ->
     %% start test server
-    {ok, _} = emqx_authn_http_test_server:start_link(?HTTP_PORT, ?HTTP_PATH),
+    {ok, _} = emqx_authn_http_test_server:start_link(?AUTHN_HTTP_PORT, ?AUTHN_HTTP_PATH),
     timer:sleep(1000),
 
     %% set authn for gateway
     Setup = fun(Gateway) ->
         Path = io_lib:format("/gateway/~ts/authentication", [Gateway]),
         {204, _} = request(delete, Path),
-        {201, _} = request(post, Path, http_auth_config())
+        {201, _} = request(post, Path, http_authn_config())
     end,
     lists:foreach(Setup, ?GATEWAYS),
 
@@ -150,6 +154,33 @@ on_start_auth(authn_http) ->
     end,
     emqx_authn_http_test_server:set_handler(Handler),
 
+    timer:sleep(500);
+on_start_auth(authz_http) ->
+    ok = emqx_authz_test_lib:reset_authorizers(),
+    {ok, _} = emqx_authz_http_test_server:start_link(?AUTHZ_HTTP_PORT, ?AUTHZ_HTTP_PATH),
+
+    %% TODO set authz for gateway
+    ok = emqx_authz_test_lib:setup_config(
+        http_authz_config(),
+        #{}
+    ),
+
+    %% set handler for test server
+    Handler = fun(Req0, State) ->
+        case cowboy_req:match_qs([topic, action, username], Req0) of
+            #{topic := <<"/publish">>, action := <<"publish">>} ->
+                Req = cowboy_req:reply(200, Req0);
+            #{topic := <<"/subscribe">>, action := <<"subscribe">>} ->
+                Req = cowboy_req:reply(200, Req0);
+            %% for lwm2m
+            #{username := <<"lwm2m">>} ->
+                Req = cowboy_req:reply(200, Req0);
+            _ ->
+                Req = cowboy_req:reply(400, Req0)
+        end,
+        {ok, Req, State}
+    end,
+    ok = emqx_authz_http_test_server:set_handler(Handler),
     timer:sleep(500).
 
 on_stop_auth(authn_http) ->
@@ -158,13 +189,15 @@ on_stop_auth(authn_http) ->
         {204, _} = request(delete, Path)
     end,
     lists:foreach(Delete, ?GATEWAYS),
-    ok = emqx_authn_http_test_server:stop().
+    ok = emqx_authn_http_test_server:stop();
+on_stop_auth(authz_http) ->
+    ok = emqx_authz_http_test_server:stop().
 
 %%------------------------------------------------------------------------------
 %% Configs
 %%------------------------------------------------------------------------------
 
-http_auth_config() ->
+http_authn_config() ->
     #{
         <<"mechanism">> => <<"password_based">>,
         <<"enable">> => <<"true">>,
@@ -175,6 +208,16 @@ http_auth_config() ->
         <<"headers">> => #{<<"X-Test-Header">> => <<"Test Value">>}
     }.
 
+http_authz_config() ->
+    #{
+        <<"enable">> => <<"true">>,
+        <<"type">> => <<"http">>,
+        <<"method">> => <<"get">>,
+        <<"url">> =>
+            <<"http://127.0.0.1:38333/authz/users/?topic=${topic}&action=${action}&username=${username}">>,
+        <<"headers">> => #{<<"X-Test-Header">> => <<"Test Value">>}
+    }.
+
 %%------------------------------------------------------------------------------
 %% Helpers
 %%------------------------------------------------------------------------------

+ 446 - 0
apps/emqx_gateway/test/emqx_gateway_authz_SUITE.erl

@@ -0,0 +1,446 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_gateway_authz_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+-import(emqx_gateway_auth_ct, [init_gateway_conf/0, with_resource/3]).
+
+-define(checkMatch(Guard),
+    (fun(Expr) ->
+        case (Expr) of
+            Guard ->
+                ok;
+            X__V ->
+                erlang:error(
+                    {assertMatch, [
+                        {module, ?MODULE},
+                        {line, ?LINE},
+                        {expression, (??Expr)},
+                        {pattern, (??Guard)},
+                        {value, X__V}
+                    ]}
+                )
+        end
+    end)
+).
+-define(FUNCTOR(Expr), fun() -> Expr end).
+-define(FUNCTOR(Arg, Expr), fun(Arg) -> Expr end).
+
+-define(AUTHNS, [authz_http]).
+
+all() ->
+    emqx_gateway_auth_ct:group_names(?AUTHNS).
+
+groups() ->
+    emqx_gateway_auth_ct:init_groups(?MODULE, ?AUTHNS).
+
+init_per_group(AuthName, Conf) ->
+    {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
+    ok = emqx_authz_test_lib:reset_authorizers(),
+    emqx_gateway_auth_ct:start_auth(AuthName),
+    timer:sleep(500),
+    Conf.
+
+end_per_group(AuthName, Conf) ->
+    emqx_gateway_auth_ct:stop_auth(AuthName),
+    Conf.
+
+init_per_suite(Config) ->
+    emqx_config:erase(gateway),
+    init_gateway_conf(),
+    meck:new(emqx_authz_file, [non_strict, passthrough, no_history, no_link]),
+    meck:expect(emqx_authz_file, init, fun(S) -> S end),
+    emqx_mgmt_api_test_util:init_suite([emqx_conf, emqx_authz, emqx_gateway]),
+    application:ensure_all_started(cowboy),
+    emqx_gateway_auth_ct:start(),
+    Config.
+
+end_per_suite(Config) ->
+    meck:unload(emqx_authz_file),
+    emqx_gateway_auth_ct:stop(),
+    ok = emqx_authz_test_lib:restore_authorizers(),
+    emqx_config:erase(gateway),
+    emqx_mgmt_api_test_util:end_suite([cowboy, emqx_authz, emqx_gateway]),
+    Config.
+
+init_per_testcase(_Case, Config) ->
+    {ok, _} = emqx_cluster_rpc:start_link(node(), emqx_cluster_rpc, 1000),
+    Config.
+
+end_per_testcase(_Case, Config) ->
+    Config.
+
+%%------------------------------------------------------------------------------
+%% Tests
+%%------------------------------------------------------------------------------
+
+t_case_coap_publish(_) ->
+    Mod = emqx_coap_SUITE,
+    Prefix = Mod:ps_prefix(),
+    Fun = fun(Channel, Token, Topic, Checker) ->
+        TopicStr = binary_to_list(Topic),
+        URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+
+        Req = Mod:make_req(post, <<>>),
+        Checker(Mod:do_request(Channel, URI, Req))
+    end,
+    Case = fun(Channel, Token) ->
+        Fun(Channel, Token, <<"/publish">>, ?checkMatch({ok, changed, _})),
+        Fun(Channel, Token, <<"/badpublish">>, ?checkMatch({error, uauthorized}))
+    end,
+    Mod:with_connection(Case).
+
+t_case_coap_subscribe(_) ->
+    Mod = emqx_coap_SUITE,
+    Prefix = Mod:ps_prefix(),
+    Fun = fun(Channel, Token, Topic, Checker) ->
+        TopicStr = binary_to_list(Topic),
+        URI = Prefix ++ TopicStr ++ "?clientid=client1&token=" ++ Token,
+
+        Req = Mod:make_req(get, <<>>, [{observe, 0}]),
+        Checker(Mod:do_request(Channel, URI, Req))
+    end,
+    Case = fun(Channel, Token) ->
+        Fun(Channel, Token, <<"/subscribe">>, ?checkMatch({ok, content, _})),
+        Fun(Channel, Token, <<"/badsubscribe">>, ?checkMatch({error, uauthorized}))
+    end,
+    Mod:with_connection(Case).
+
+-record(coap_content, {content_format, payload = <<>>}).
+
+t_case_lwm2m(_) ->
+    MsgId = 12,
+    Mod = emqx_lwm2m_SUITE,
+    Epn = "urn:oma:lwm2m:oma:3",
+    Port = emqx_lwm2m_SUITE:default_port(),
+    URI = "coap://127.0.0.1:~b/rd?ep=~ts&imei=~ts&lt=345&lwm2m=1&password=public",
+    Test = fun(Username, Checker) ->
+        SubTopic = list_to_binary("lwm2m/" ++ Username ++ "/dn/#"),
+        ReportTopic = list_to_binary("lwm2m/" ++ Username ++ "/up/resp"),
+        with_resource(
+            ?FUNCTOR(gen_udp:open(0, [binary, {active, false}])),
+            ?FUNCTOR(Socket, gen_udp:close(Socket)),
+            fun(Socket) ->
+                Send = fun() ->
+                    Mod:test_send_coap_request(
+                        Socket,
+                        post,
+                        Mod:sprintf(URI, [Port, Epn, Username]),
+                        #coap_content{
+                            content_format = <<"text/plain">>,
+                            payload = <<"</1>, </2>, </3>, </4>, </5>">>
+                        },
+                        [],
+                        MsgId
+                    ),
+
+                    LoginResult = Mod:test_recv_coap_response(Socket),
+                    ?assertEqual(ack, emqx_coap_SUITE:get_field(type, LoginResult)),
+                    ?assertEqual({ok, created}, emqx_coap_SUITE:get_field(method, LoginResult))
+                end,
+                try_publish_recv(ReportTopic, Send, fun(Data) -> Checker(SubTopic, Data) end)
+            end
+        )
+    end,
+    Test("lwm2m", fun(SubTopic, Msg) ->
+        ?assertEqual(true, lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics())),
+        Payload = emqx_message:payload(Msg),
+        Cmd = emqx_json:decode(Payload, [return_maps]),
+        ?assertMatch(#{<<"msgType">> := <<"register">>, <<"data">> := _}, Cmd)
+    end),
+
+    Test("/baduser", fun(SubTopic, Data) ->
+        ?assertEqual(false, lists:member(SubTopic, test_mqtt_broker:get_subscrbied_topics())),
+        ?assertEqual(timeout, Data)
+    end),
+    ok.
+
+-define(SN_CONNACK, 16#05).
+t_case_sn_publish(_) ->
+    set_sn_predefined_topic(),
+    Mod = emqx_sn_protocol_SUITE,
+    Payload = <<"publish with authz">>,
+    Publish = fun(TopicId, Topic, Checker) ->
+        with_resource(
+            ?FUNCTOR(gen_udp:open(0, [binary])),
+            ?FUNCTOR(Socket, gen_udp:close(Socket)),
+            fun(Socket) ->
+                Mod:send_connect_msg(Socket, <<"client_id_test1">>),
+                ?assertEqual(<<3, ?SN_CONNACK, 0>>, Mod:receive_response(Socket)),
+
+                Send = fun() ->
+                    Mod:send_publish_msg_normal_topic(Socket, 0, 1, TopicId, Payload)
+                end,
+                try_publish_recv(Topic, Send, Checker)
+            end
+        )
+    end,
+    Publish(1, <<"/publish">>, fun(Msg) -> ?assertMatch(Payload, emqx_message:payload(Msg)) end),
+    Publish(2, <<"/badpublish">>, fun(Msg) -> ?assertEqual(timeout, Msg) end),
+    ok.
+
+t_case_sn_subscribe(_) ->
+    set_sn_predefined_topic(),
+    Mod = emqx_sn_protocol_SUITE,
+    Payload = <<"subscribe with authz">>,
+    Sub = fun(Topic, Checker) ->
+        with_resource(
+            ?FUNCTOR(gen_udp:open(0, [binary])),
+            ?FUNCTOR(Socket, gen_udp:close(Socket)),
+            fun(Socket) ->
+                Mod:send_connect_msg(Socket, <<"client_id_test1">>),
+                ?assertEqual(<<3, ?SN_CONNACK, 0>>, Mod:receive_response(Socket)),
+
+                Mod:send_subscribe_msg_normal_topic(Socket, 0, Topic, 1),
+                _ = Mod:receive_response(Socket),
+
+                timer:sleep(100),
+                Msg = emqx_message:make(Topic, Payload),
+                emqx:publish(Msg),
+
+                timer:sleep(100),
+
+                Recv = Mod:receive_response(Socket),
+                Checker(Recv)
+            end
+        )
+    end,
+    Sub(<<"/subscribe">>, fun(Data) ->
+        {ok, Msg, _, _} = emqx_sn_frame:parse(Data, undefined),
+        ?assertMatch({mqtt_sn_message, _, {_, 3, 0, Payload}}, Msg)
+    end),
+    Sub(<<"/badsubscribe">>, fun(Data) ->
+        ?assertEqual(udp_receive_timeout, Data)
+    end),
+    ok.
+
+t_case_stomp_publish(_) ->
+    Mod = emqx_stomp_SUITE,
+    Payload = <<"publish with authz">>,
+    Publish = fun(Topic, Checker) ->
+        Fun = fun(Sock) ->
+            gen_tcp:send(
+                Sock,
+                Mod:serialize(
+                    <<"CONNECT">>,
+                    [
+                        {<<"accept-version">>, Mod:stomp_ver()},
+                        {<<"host">>, <<"127.0.0.1:61613">>},
+                        {<<"login">>, <<"guest">>},
+                        {<<"passcode">>, <<"guest">>},
+                        {<<"heart-beat">>, <<"0,0">>}
+                    ]
+                )
+            ),
+            {ok, Data} = gen_tcp:recv(Sock, 0),
+            {ok, Frame, _, _} = Mod:parse(Data),
+            ?assertEqual(<<"CONNECTED">>, Mod:get_field(command, Frame)),
+            Send = fun() ->
+                gen_tcp:send(
+                    Sock,
+                    Mod:serialize(
+                        <<"SEND">>,
+                        [{<<"destination">>, Topic}],
+                        Payload
+                    )
+                )
+            end,
+            try_publish_recv(Topic, Send, Checker)
+        end,
+        Mod:with_connection(Fun)
+    end,
+    Publish(<<"/publish">>, fun(Msg) -> ?assertMatch(Payload, emqx_message:payload(Msg)) end),
+    Publish(<<"/badpublish">>, fun(Msg) -> ?assertEqual(timeout, Msg) end),
+    ok.
+
+t_case_stomp_subscribe(_) ->
+    Mod = emqx_stomp_SUITE,
+    Payload = <<"subscribe with authz">>,
+    Sub = fun(Topic, Checker) ->
+        Fun = fun(Sock) ->
+            gen_tcp:send(
+                Sock,
+                Mod:serialize(
+                    <<"CONNECT">>,
+                    [
+                        {<<"accept-version">>, Mod:stomp_ver()},
+                        {<<"host">>, <<"127.0.0.1:61613">>},
+                        {<<"login">>, <<"guest">>},
+                        {<<"passcode">>, <<"guest">>},
+                        {<<"heart-beat">>, <<"0,0">>}
+                    ]
+                )
+            ),
+            {ok, Data} = gen_tcp:recv(Sock, 0),
+            {ok, Frame, _, _} = Mod:parse(Data),
+            ?assertEqual(<<"CONNECTED">>, Mod:get_field(command, Frame)),
+
+            %% Subscribe
+            gen_tcp:send(
+                Sock,
+                Mod:serialize(
+                    <<"SUBSCRIBE">>,
+                    [
+                        {<<"id">>, 0},
+                        {<<"destination">>, Topic},
+                        {<<"ack">>, <<"auto">>}
+                    ]
+                )
+            ),
+
+            timer:sleep(100),
+            Msg = emqx_message:make(Topic, Payload),
+            emqx:publish(Msg),
+
+            timer:sleep(100),
+            {ok, Data1} = gen_tcp:recv(Sock, 0, 2000),
+            {ok, Frame1, _, _} = Mod:parse(Data1),
+            Checker(Frame1)
+        end,
+        Mod:with_connection(Fun)
+    end,
+    Sub(<<"/subscribe">>, fun(Frame) ->
+        ?assertMatch(<<"MESSAGE">>, Mod:get_field(command, Frame)),
+        ?assertMatch(Payload, Mod:get_field(body, Frame))
+    end),
+    Sub(<<"/badsubscribe">>, fun(Frame) ->
+        ?assertMatch(<<"ERROR">>, Mod:get_field(command, Frame)),
+        ?assertMatch(<<"ACL Deny">>, Mod:get_field(body, Frame))
+    end),
+    ok.
+
+t_case_exproto_publish(_) ->
+    Mod = emqx_exproto_SUITE,
+    SvrMod = emqx_exproto_echo_svr,
+    Svrs = SvrMod:start(),
+    Payload = <<"publish with authz">>,
+    Publish = fun(Topic, Checker) ->
+        with_resource(
+            ?FUNCTOR(Mod:open(tcp)),
+            ?FUNCTOR(Sock, Mod:close(Sock)),
+            fun(Sock) ->
+                Client = #{
+                    proto_name => <<"demo">>,
+                    proto_ver => <<"v0.1">>,
+                    clientid => <<"test_client_1">>,
+                    username => <<"admin">>
+                },
+
+                ConnBin = SvrMod:frame_connect(Client, <<"public">>),
+
+                Mod:send(Sock, ConnBin),
+                {ok, Recv} = Mod:recv(Sock, 5000),
+                C = ?FUNCTOR(Bin, emqx_json:decode(Bin, [return_maps])),
+                ?assertEqual(C(SvrMod:frame_connack(0)), C(Recv)),
+
+                Send = fun() ->
+                    PubBin = SvrMod:frame_publish(Topic, 0, Payload),
+                    Mod:send(Sock, PubBin)
+                end,
+                try_publish_recv(Topic, Send, Checker)
+            end
+        )
+    end,
+    Publish(<<"/publish">>, fun(Msg) -> ?assertMatch(Payload, emqx_message:payload(Msg)) end),
+    Publish(<<"/badpublish">>, fun(Msg) -> ?assertEqual(timeout, Msg) end),
+    SvrMod:stop(Svrs),
+    ok.
+
+t_case_exproto_subscribe(_) ->
+    Mod = emqx_exproto_SUITE,
+    SvrMod = emqx_exproto_echo_svr,
+    Svrs = SvrMod:start(),
+    WaitTime = 5000,
+    Sub = fun(Topic, ErrorCode) ->
+        with_resource(
+            ?FUNCTOR(Mod:open(tcp)),
+            ?FUNCTOR(Sock, Mod:close(Sock)),
+            fun(Sock) ->
+                Client = #{
+                    proto_name => <<"demo">>,
+                    proto_ver => <<"v0.1">>,
+                    clientid => <<"test_client_1">>,
+                    username => <<"admin">>
+                },
+
+                ConnBin = SvrMod:frame_connect(Client, <<"public">>),
+
+                Mod:send(Sock, ConnBin),
+                {ok, Recv} = Mod:recv(Sock, WaitTime),
+                C = ?FUNCTOR(Bin, emqx_json:decode(Bin, [return_maps])),
+                ?assertEqual(C(SvrMod:frame_connack(0)), C(Recv)),
+
+                SubBin = SvrMod:frame_subscribe(Topic, 0),
+                Mod:send(Sock, SubBin),
+                {ok, SubAckBin} = Mod:recv(Sock, WaitTime),
+                ?assertEqual(SvrMod:frame_suback(ErrorCode), SubAckBin)
+            end
+        )
+    end,
+
+    Sub(<<"/subscribe">>, 0),
+    Sub(<<"/badsubscribe">>, 1),
+    SvrMod:stop(Svrs),
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Helpers
+%%------------------------------------------------------------------------------
+try_publish_recv(Topic, Publish, Checker) ->
+    emqx:subscribe(Topic),
+    Clear = fun(Msg) ->
+        emqx:unsubscribe(Topic),
+        Checker(Msg)
+    end,
+    Publish(),
+    timer:sleep(200),
+    receive
+        {deliver, Topic, Msg} ->
+            Clear(Msg)
+    after 500 ->
+        Clear(timeout)
+    end.
+
+set_sn_predefined_topic() ->
+    RawCfg = emqx_conf:get_raw([gateway, mqttsn], #{}),
+    NewCfg = RawCfg#{
+        <<"predefined">> => [
+            #{
+                id => 1,
+                topic => "/publish"
+            },
+            #{
+                id => 2,
+                topic => "/badpublish"
+            },
+            #{
+                id => 3,
+                topic => "/subscribe"
+            },
+            #{
+                id => 4,
+                topic => "/badsubscribe"
+            }
+        ]
+    },
+    emqx_gateway_conf:update_gateway(mqttsn, NewCfg),
+    ok.