Просмотр исходного кода

chore(authz): fix HTTP authz, cover with tests

Ilya Averyanov 4 лет назад
Родитель
Сommit
e0f860d7d9

+ 4 - 2
apps/emqx_authz/src/emqx_authz.erl

@@ -59,6 +59,8 @@
 
 -define(METRICS, [?METRIC_ALLOW, ?METRIC_DENY, ?METRIC_NOMATCH]).
 
+-define(IS_ENABLED(Enable), ((Enable =:= true) or (Enable =:= <<"true">>))).
+
 %% Initialize authz backend.
 %% Populate the passed configuration map with necessary data,
 %% like `ResourceID`s
@@ -155,8 +157,8 @@ do_update({?CMD_APPEND, Sources}, Conf) when is_list(Sources), is_list(Conf) ->
     NConf = Conf ++ Sources,
     ok = check_dup_types(NConf),
     NConf;
-do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := true} = Source}, Conf) when is_map(Source),
-                                                                               is_list(Conf) ->
+do_update({{?CMD_REPLACE, Type}, #{<<"enable">> := Enable} = Source}, Conf)
+  when is_map(Source), is_list(Conf), ?IS_ENABLED(Enable) ->
     case create_dry_run(Type, Source)  of
         ok ->
             {_Old, Front, Rear} = take(Type, Conf),

+ 98 - 41
apps/emqx_authz/src/emqx_authz_http.erl

@@ -40,9 +40,8 @@
 description() ->
     "AuthZ with http".
 
-init(#{url := Url} = Source) ->
-    NSource = maps:put(base_url, maps:remove(query, Url), Source),
-    case emqx_authz_utils:create_resource(emqx_connector_http, NSource) of
+init(Source) ->
+    case emqx_authz_utils:create_resource(emqx_connector_http, Source) of
         {error, Reason} -> error({load_config_error, Reason});
         {ok, Id} -> Source#{annotations => #{id => Id}}
     end.
@@ -51,39 +50,60 @@ destroy(#{annotations := #{id := Id}}) ->
     ok = emqx_resource:remove(Id).
 
 dry_run(Source) ->
-    URIMap = maps:get(url, Source),
-    NSource = maps:put(base_url, maps:remove(query, URIMap), Source),
-    emqx_resource:create_dry_run(emqx_connector_http, NSource).
+    emqx_resource:create_dry_run(emqx_connector_http, Source).
 
 authorize(Client, PubSub, Topic,
             #{type := http,
-              url := #{path := Path} = URL,
+              query := Query,
+              path := Path,
               headers := Headers,
               method := Method,
               request_timeout := RequestTimeout,
               annotations := #{id := ResourceID}
              } = Source) ->
     Request = case Method of
-                  get  ->
-                      Query = maps:get(query, URL, ""),
-                      Path1 = replvar(Path ++ "?" ++ Query, PubSub, Topic, Client),
+                  get ->
+                      Path1 = replvar(
+                                Path ++ "?" ++ Query,
+                                PubSub,
+                                Topic,
+                                maps:to_list(Client),
+                                fun var_uri_encode/1),
+
                       {Path1, maps:to_list(Headers)};
+
                   _ ->
-                      Body0 = serialize_body(
-                                maps:get('Accept', Headers, <<"application/json">>),
-                                maps:get(body, Source, #{})
-                              ),
-                      Body1 = replvar(Body0, PubSub, Topic, Client),
-                      Path1 = replvar(Path, PubSub, Topic, Client),
-                      {Path1, maps:to_list(Headers), Body1}
+                      Body0 = maps:get(body, Source, #{}),
+                      Body1 = replvar_deep(
+                                Body0,
+                                PubSub,
+                                Topic,
+                                maps:to_list(Client),
+                                fun var_bin_encode/1),
+
+                      Body2 = serialize_body(
+                                maps:get(<<"content-type">>, Headers, <<"application/json">>),
+                                Body1),
+
+                      Path1 = replvar(
+                                Path,
+                                PubSub,
+                                Topic,
+                                maps:to_list(Client),
+                                fun var_uri_encode/1),
+
+                      {Path1, maps:to_list(Headers), Body2}
               end,
-    case emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}) of
+    HttpResult = emqx_resource:query(ResourceID, {Method, Request, RequestTimeout}),
+    case HttpResult of
         {ok, 200, _Headers} ->
             {matched, allow};
         {ok, 204, _Headers} ->
             {matched, allow};
         {ok, 200, _Headers, _Body} ->
             {matched, allow};
+        {ok, _Status, _Headers} ->
+            nomatch;
         {ok, _Status, _Headers, _Body} ->
             nomatch;
         {error, Reason} ->
@@ -121,30 +141,67 @@ serialize_body(<<"application/json">>, Body) ->
 serialize_body(<<"application/x-www-form-urlencoded">>, Body) ->
     query_string(Body).
 
-replvar(Str0, PubSub, Topic,
-        #{username := Username,
-          clientid := Clientid,
-          peerhost := IpAddress,
-          protocol := Protocol,
-          mountpoint := Mountpoint
-         }) when is_list(Str0);
-                 is_binary(Str0) ->
+
+replvar_deep(Map, PubSub, Topic, Vars, VarEncode) when is_map(Map) ->
+    maps:from_list(
+      lists:map(
+        fun({Key, Value}) ->
+                {replvar(Key, PubSub, Topic, Vars, VarEncode),
+                 replvar(Value, PubSub, Topic, Vars, VarEncode)}
+        end,
+        maps:to_list(Map)));
+replvar_deep(List, PubSub, Topic, Vars, VarEncode) when is_list(List) ->
+    lists:map(
+      fun(Value) ->
+              replvar(Value, PubSub, Topic, Vars, VarEncode)
+      end,
+      List);
+replvar_deep(Number, _PubSub, _Topic, _Vars, _VarEncode) when is_number(Number) ->
+    Number;
+replvar_deep(Binary, PubSub, Topic, Vars, VarEncode) when is_binary(Binary) ->
+    replvar(Binary, PubSub, Topic, Vars, VarEncode).
+
+replvar(Str0, PubSub, Topic, [], VarEncode) ->
     NTopic = emqx_http_lib:uri_encode(Topic),
-    Str1 = re:replace( Str0, emqx_authz:ph_to_re(?PH_S_CLIENTID)
-                     , bin(Clientid), [global, {return, binary}]),
-    Str2 = re:replace( Str1, emqx_authz:ph_to_re(?PH_S_USERNAME)
-                     , bin(Username), [global, {return, binary}]),
-    Str3 = re:replace( Str2, emqx_authz:ph_to_re(?PH_S_HOST)
-                     , inet_parse:ntoa(IpAddress), [global, {return, binary}]),
-    Str4 = re:replace( Str3, emqx_authz:ph_to_re(?PH_S_PROTONAME)
-                     , bin(Protocol), [global, {return, binary}]),
-    Str5 = re:replace( Str4, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT)
-                     , bin(Mountpoint), [global, {return, binary}]),
-    Str6 = re:replace( Str5, emqx_authz:ph_to_re(?PH_S_TOPIC)
-                     , bin(NTopic), [global, {return, binary}]),
-    Str7 = re:replace( Str6, emqx_authz:ph_to_re(?PH_S_ACTION)
-                     , bin(PubSub), [global, {return, binary}]),
-    Str7.
+    Str1 = re:replace(Str0, emqx_authz:ph_to_re(?PH_S_TOPIC),
+                      VarEncode(NTopic), [global, {return, binary}]),
+    re:replace(Str1, emqx_authz:ph_to_re(?PH_S_ACTION),
+               VarEncode(PubSub), [global, {return, binary}]);
+
+
+replvar(Str, PubSub, Topic, [{username, Username} | Rest], VarEncode) ->
+    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_USERNAME),
+                      VarEncode(Username), [global, {return, binary}]),
+    replvar(Str1, PubSub, Topic, Rest, VarEncode);
+
+replvar(Str, PubSub, Topic, [{clientid, Clientid} | Rest], VarEncode) ->
+    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_CLIENTID),
+                      VarEncode(Clientid), [global, {return, binary}]),
+    replvar(Str1, PubSub, Topic, Rest, VarEncode);
+
+replvar(Str, PubSub, Topic, [{peerhost, IpAddress}  | Rest], VarEncode) ->
+    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_PEERHOST),
+                      VarEncode(inet_parse:ntoa(IpAddress)), [global, {return, binary}]),
+    replvar(Str1, PubSub, Topic, Rest, VarEncode);
+
+replvar(Str, PubSub, Topic, [{protocol, Protocol} | Rest], VarEncode) ->
+    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_PROTONAME),
+                      VarEncode(Protocol), [global, {return, binary}]),
+    replvar(Str1, PubSub, Topic, Rest, VarEncode);
+
+replvar(Str, PubSub, Topic, [{mountpoint, Mountpoint} | Rest], VarEncode) ->
+    Str1 = re:replace(Str, emqx_authz:ph_to_re(?PH_S_MOUNTPOINT),
+                      VarEncode(Mountpoint), [global, {return, binary}]),
+    replvar(Str1, PubSub, Topic, Rest, VarEncode);
+
+replvar(Str, PubSub, Topic, [_Unknown | Rest], VarEncode) ->
+    replvar(Str, PubSub, Topic, Rest, VarEncode).
+
+var_uri_encode(S) ->
+    emqx_http_lib:uri_encode(bin(S)).
+
+var_bin_encode(S) ->
+    bin(S).
 
 bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
 bin(B) when is_binary(B) -> B;

+ 4 - 7
apps/emqx_authz/src/emqx_authz_schema.erl

@@ -20,14 +20,10 @@
 
 -reflect_type([ permission/0
               , action/0
-              , url/0
               ]).
 
--typerefl_from_string({url/0, emqx_http_lib, uri_parse}).
-
 -type action() :: publish | subscribe | all.
 -type permission() :: allow | deny.
--type url() :: emqx_http_lib:uri_map().
 
 -export([ namespace/0
         , roots/0
@@ -143,10 +139,11 @@ fields(redis_cluster) ->
 http_common_fields() ->
     [ {type,            #{type => http}}
     , {enable,          #{type => boolean(), default => true}}
-    , {url,             #{type => url()}}
     , {request_timeout, mk_duration("request timeout", #{default => "30s"})}
     , {body,            #{type => map(), nullable => true}}
-    ] ++ proplists:delete(base_url, emqx_connector_http:fields(config)).
+    , {path,            #{type => string(), default => ""}}
+    , {query,           #{type => string(), default => ""}}
+    ] ++ emqx_connector_http:fields(config).
 
 mongo_common_fields() ->
     [ {collection, #{type => atom()}}
@@ -203,7 +200,7 @@ check_ssl_opts(Conf)
   when Conf =:= #{} ->
     true;
 check_ssl_opts(Conf) ->
-    case emqx_authz_http:parse_url(hocon_schema:get_value("config.url", Conf)) of
+    case emqx_authz_http:parse_url(hocon_schema:get_value("config.base_url", Conf)) of
         #{scheme := https} ->
             case hocon_schema:get_value("config.ssl.enable", Conf) of
                 true -> ok;

+ 4 - 2
apps/emqx_authz/test/emqx_authz_SUITE.erl

@@ -65,7 +65,9 @@ set_special_configs(_App) ->
 
 -define(SOURCE1, #{<<"type">> => <<"http">>,
                    <<"enable">> => true,
-                   <<"url">> => <<"https://fake.com:443/">>,
+                   <<"base_url">> => <<"https://example.com:443/">>,
+                   <<"path">> => <<"a/b">>,
+                   <<"query">> => <<"c=d">>,
                    <<"headers">> => #{},
                    <<"method">> => <<"get">>,
                    <<"request_timeout">> => 5000
@@ -77,7 +79,7 @@ set_special_configs(_App) ->
                    <<"pool_size">> => 1,
                    <<"database">> => <<"mqtt">>,
                    <<"ssl">> => #{<<"enable">> => false},
-                   <<"collection">> => <<"fake">>,
+                   <<"collection">> => <<"authz">>,
                    <<"selector">> => #{<<"a">> => <<"b">>}
                   }).
 -define(SOURCE3, #{<<"type">> => <<"mysql">>,

+ 365 - 50
apps/emqx_authz/test/emqx_authz_http_SUITE.erl

@@ -4,7 +4,8 @@
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
 %% You may obtain a copy of the License at
-%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
 %%
 %% Unless required by applicable law or agreed to in writing, software
 %% distributed under the License is distributed on an "AS IS" BASIS,
@@ -22,75 +23,389 @@
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
 
+-define(HTTP_PORT, 33333).
+-define(HTTP_PATH, "/authz/[...]").
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
-groups() ->
-    [].
-
 init_per_suite(Config) ->
-    meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
-    meck:expect(emqx_resource, remove, fun(_) -> ok end ),
-
     ok = emqx_common_test_helpers:start_apps(
            [emqx_conf, emqx_authz],
-           fun set_special_configs/1),
-
-    Rules = [#{<<"type">> => <<"http">>,
-               <<"url">> => <<"https://fake.com:443/">>,
-               <<"headers">> => #{},
-               <<"method">> => <<"get">>,
-               <<"request_timeout">> => 5000
-              }
-            ],
-    {ok, _} = emqx_authz:update(replace, Rules),
+           fun set_special_configs/1
+          ),
+    ok = start_apps([emqx_resource, emqx_connector, cowboy]),
     Config.
 
 end_per_suite(_Config) ->
-    {ok, _} = emqx:update_config(
-                [authorization],
-                #{<<"no_match">> => <<"allow">>,
-                  <<"cache">> => #{<<"enable">> => <<"true">>},
-                  <<"sources">> => []}),
-    emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
-    meck:unload(emqx_resource),
-    ok.
+    ok = emqx_authz_test_lib:restore_authorizers(),
+    ok = stop_apps([emqx_resource, emqx_connector, cowboy]),
+    ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
 
 set_special_configs(emqx_authz) ->
-    {ok, _} = emqx:update_config([authorization, cache, enable], false),
-    {ok, _} = emqx:update_config([authorization, no_match], deny),
-    {ok, _} = emqx:update_config([authorization, sources], []),
-    ok;
-set_special_configs(_App) ->
+    ok = emqx_authz_test_lib:reset_authorizers();
+
+set_special_configs(_) ->
     ok.
 
+init_per_testcase(_Case, Config) ->
+    ok = emqx_authz_test_lib:reset_authorizers(),
+    ok = emqx_authz_http_test_server:start(?HTTP_PORT, ?HTTP_PATH),
+    Config.
+
+end_per_testcase(_Case, _Config) ->
+    ok = emqx_authz_http_test_server:stop().
+
 %%------------------------------------------------------------------------------
-%% Testcases
+%% Tests
 %%------------------------------------------------------------------------------
 
-t_authz(_) ->
-    ClientInfo = #{clientid => <<"my-clientid">>,
-                   username => <<"my-username">>,
+t_response_handling(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
                    peerhost => {127,0,0,1},
-                   protocol => mqtt,
-                   mountpoint => <<"fake">>,
                    zone => default,
                    listener => {tcp, default}
-                   },
+                  },
 
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, 204, fake_headers} end),
-    ?assertEqual(allow,
-                 emqx_access_control:authorize(ClientInfo, subscribe, <<"#">>)),
+    %% OK, get, no body
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                   Req = cowboy_req:reply(200, Req0),
+                   {ok, Req, State}
+           end,
+           #{}),
 
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, 200, fake_headers, fake_body} end),
-    ?assertEqual(allow,
-                 emqx_access_control:authorize(ClientInfo, publish, <<"#">>)),
+    allow = emqx_access_control:authorize(ClientInfo, publish, <<"t">>),
 
+    %% OK, get, body & headers
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                   Req = cowboy_req:reply(
+                           200,
+                           #{<<"content-type">> => <<"text/plain">>},
+                           "Response body",
+                           Req0),
+                   {ok, Req, State}
+           end,
+           #{}),
 
-    meck:expect(emqx_resource, query, fun(_, _) -> {error, other} end),
-    ?assertEqual(deny,
-        emqx_access_control:authorize(ClientInfo, subscribe, <<"+">>)),
-    ?assertEqual(deny,
-        emqx_access_control:authorize(ClientInfo, publish, <<"+">>)),
-    ok.
+    ?assertEqual(
+        allow,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)),
+
+    %% OK, get, 204
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                   Req = cowboy_req:reply(204, Req0),
+                   {ok, Req, State}
+           end,
+           #{}),
+
+    ?assertEqual(
+        allow,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)),
+
+    %% Not OK, get, 400
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                   Req = cowboy_req:reply(400, Req0),
+                   {ok, Req, State}
+           end,
+           #{}),
+
+    ?assertEqual(
+        deny,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)),
+
+    %% Not OK, get, 400 + body & headers
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                   Req = cowboy_req:reply(
+                           400,
+                           #{<<"content-type">> => <<"text/plain">>},
+                           "Response body",
+                           Req0),
+                   {ok, Req, State}
+           end,
+           #{}),
+
+    ?assertEqual(
+        deny,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)).
+
+t_query_params(_Config) ->
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                  #{username := <<"user name">>,
+                    clientid := <<"client id">>,
+                    peerhost := <<"127.0.0.1">>,
+                    proto_name := <<"MQTT">>,
+                    mountpoint := <<"MOUNTPOINT">>,
+                    topic := <<"t">>,
+                    action := <<"publish">>
+                   } = cowboy_req:match_qs(
+                         [username,
+                          clientid,
+                          peerhost,
+                          proto_name,
+                          mountpoint,
+                          topic,
+                          action],
+                         Req0),
+                   Req = cowboy_req:reply(200, Req0),
+                   {ok, Req, State}
+           end,
+           #{<<"query">> => <<"username=${username}&"
+                             "clientid=${clientid}&"
+                             "peerhost=${peerhost}&"
+                             "proto_name=${proto_name}&"
+                             "mountpoint=${mountpoint}&"
+                             "topic=${topic}&"
+                             "action=${action}">>
+            }),
+
+    ClientInfo = #{clientid => <<"client id">>,
+                   username => <<"user name">>,
+                   peerhost => {127,0,0,1},
+                   protocol => <<"MQTT">>,
+                   mountpoint => <<"MOUNTPOINT">>,
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ?assertEqual(
+        allow,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)).
+
+t_path_params(_Config) ->
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                   <<"/authz/"
+                     "username/user%20name/"
+                     "clientid/client%20id/"
+                     "peerhost/127.0.0.1/"
+                     "proto_name/MQTT/"
+                     "mountpoint/MOUNTPOINT/"
+                     "topic/t/"
+                     "action/publish">> = cowboy_req:path(Req0),
+                   Req = cowboy_req:reply(200, Req0),
+                   {ok, Req, State}
+           end,
+           #{<<"path">> => <<"username/${username}/"
+                             "clientid/${clientid}/"
+                             "peerhost/${peerhost}/"
+                             "proto_name/${proto_name}/"
+                             "mountpoint/${mountpoint}/"
+                             "topic/${topic}/"
+                             "action/${action}">>
+            }),
+
+    ClientInfo = #{clientid => <<"client id">>,
+                   username => <<"user name">>,
+                   peerhost => {127,0,0,1},
+                   protocol => <<"MQTT">>,
+                   mountpoint => <<"MOUNTPOINT">>,
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ?assertEqual(
+        allow,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)).
+
+t_json_body(_Config) ->
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                   ?assertEqual(
+                      <<"/authz/"
+                        "username/user%20name/"
+                        "clientid/client%20id/"
+                        "peerhost/127.0.0.1/"
+                        "proto_name/MQTT/"
+                        "mountpoint/MOUNTPOINT/"
+                        "topic/t/"
+                        "action/publish">>,
+                      cowboy_req:path(Req0)),
+
+                   {ok, RawBody, Req1} = cowboy_req:read_body(Req0),
+
+                   ?assertMatch(
+                      #{<<"username">> := <<"user name">>,
+                        <<"CLIENT_client id">> := <<"client id">>,
+                        <<"peerhost">> := <<"127.0.0.1">>,
+                        <<"proto_name">> := <<"MQTT">>,
+                        <<"mountpoint">> := <<"MOUNTPOINT">>,
+                        <<"topic">> := <<"t">>,
+                        <<"action">> := <<"publish">>},
+                      jiffy:decode(RawBody, [return_maps])),
+
+                   Req = cowboy_req:reply(200, Req1),
+                   {ok, Req, State}
+           end,
+           #{<<"method">> => <<"post">>,
+             <<"path">> => <<"username/${username}/"
+                             "clientid/${clientid}/"
+                             "peerhost/${peerhost}/"
+                             "proto_name/${proto_name}/"
+                             "mountpoint/${mountpoint}/"
+                             "topic/${topic}/"
+                             "action/${action}">>,
+             <<"body">> => #{<<"username">> => <<"${username}">>,
+                             <<"CLIENT_${clientid}">> => <<"${clientid}">>,
+                             <<"peerhost">> => <<"${peerhost}">>,
+                             <<"proto_name">> => <<"${proto_name}">>,
+                             <<"mountpoint">> => <<"${mountpoint}">>,
+                             <<"topic">> => <<"${topic}">>,
+                             <<"action">> => <<"${action}">>}
+            }),
+
+    ClientInfo = #{clientid => <<"client id">>,
+                   username => <<"user name">>,
+                   peerhost => {127,0,0,1},
+                   protocol => <<"MQTT">>,
+                   mountpoint => <<"MOUNTPOINT">>,
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ?assertEqual(
+        allow,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)).
+
+
+t_form_body(_Config) ->
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                   ?assertEqual(
+                      <<"/authz/"
+                        "username/user%20name/"
+                        "clientid/client%20id/"
+                        "peerhost/127.0.0.1/"
+                        "proto_name/MQTT/"
+                        "mountpoint/MOUNTPOINT/"
+                        "topic/t/"
+                        "action/publish">>,
+                      cowboy_req:path(Req0)),
+                    
+                   {ok, PostVars, Req1} = cowboy_req:read_urlencoded_body(Req0),
+
+                   ?assertMatch(
+                      #{<<"username">> := <<"user name">>,
+                        <<"clientid">> := <<"client id">>,
+                        <<"peerhost">> := <<"127.0.0.1">>,
+                        <<"proto_name">> := <<"MQTT">>,
+                        <<"mountpoint">> := <<"MOUNTPOINT">>,
+                        <<"topic">> := <<"t">>,
+                        <<"action">> := <<"publish">>},
+                      maps:from_list(PostVars)),
+
+                   Req = cowboy_req:reply(200, Req1),
+                   {ok, Req, State}
+           end,
+           #{<<"method">> => <<"post">>,
+             <<"path">> => <<"username/${username}/"
+                             "clientid/${clientid}/"
+                             "peerhost/${peerhost}/"
+                             "proto_name/${proto_name}/"
+                             "mountpoint/${mountpoint}/"
+                             "topic/${topic}/"
+                             "action/${action}">>,
+             <<"body">> => #{<<"username">> => <<"${username}">>,
+                             <<"clientid">> => <<"${clientid}">>,
+                             <<"peerhost">> => <<"${peerhost}">>,
+                             <<"proto_name">> => <<"${proto_name}">>,
+                             <<"mountpoint">> => <<"${mountpoint}">>,
+                             <<"topic">> => <<"${topic}">>,
+                             <<"action">> => <<"${action}">>},
+             <<"headers">> => #{<<"content-type">> => <<"application/x-www-form-urlencoded">>}
+            }),
+
+    ClientInfo = #{clientid => <<"client id">>,
+                   username => <<"user name">>,
+                   peerhost => {127,0,0,1},
+                   protocol => <<"MQTT">>,
+                   mountpoint => <<"MOUNTPOINT">>,
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ?assertEqual(
+        allow,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)).
+
+
+t_create_replace(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    %% Bad URL
+    ok = setup_handler_and_config(
+           fun(Req0, State) ->
+                   Req = cowboy_req:reply(200, Req0),
+                   {ok, Req, State}
+           end,
+           #{<<"base_url">> => <<"http://127.0.0.1:33331/authz">>}),
+
+
+    ?assertEqual(
+        deny,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)),
+
+    %% Changing to other bad config does not work
+    BadConfig = maps:merge(
+                  raw_http_authz_config(),
+                  #{<<"base_url">> => <<"http://127.0.0.1:33332/authz">>}),
+
+    ?assertMatch(
+        {error, _},
+        emqx_authz:update({?CMD_REPLACE, http}, BadConfig)),
+
+    ?assertEqual(
+        deny,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)),
+
+    %% Changing to valid config
+    OkConfig = maps:merge(
+                  raw_http_authz_config(),
+                  #{<<"base_url">> => <<"http://127.0.0.1:33333/authz">>}),
+    
+    ?assertMatch(
+        {ok, _},
+        emqx_authz:update({?CMD_REPLACE, http}, OkConfig)),
+
+    ?assertEqual(
+        allow,
+        emqx_access_control:authorize(ClientInfo, publish, <<"t">>)).
+
+%%------------------------------------------------------------------------------
+%% Helpers
+%%------------------------------------------------------------------------------
+
+raw_http_authz_config() ->
+    #{
+        <<"enable">> => <<"true">>,
+
+        <<"type">> => <<"http">>,
+        <<"method">> => <<"get">>,
+        <<"base_url">> => <<"http://127.0.0.1:33333/authz">>,
+        <<"path">> => <<"users/${username}/">>,
+        <<"query">> => <<"topic=${topic}&action=${action}">>,
+        <<"headers">> => #{<<"X-Test-Header">> => <<"Test Value">>}
+    }.
+
+setup_handler_and_config(Handler, Config) ->
+    ok = emqx_authz_http_test_server:set_handler(Handler),
+    ok = emqx_authz_test_lib:setup_config(
+           raw_http_authz_config(),
+           Config).
+
+start_apps(Apps) ->
+    lists:foreach(fun application:ensure_all_started/1, Apps).
+
+stop_apps(Apps) ->
+    lists:foreach(fun application:stop/1, Apps).

+ 89 - 0
apps/emqx_authz/test/emqx_authz_http_test_server.erl

@@ -0,0 +1,89 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authz_http_test_server).
+
+-behaviour(gen_server).
+-behaviour(cowboy_handler).
+
+% cowboy_server callbacks
+-export([init/2]).
+
+% gen_server callbacks
+-export([init/1,
+         handle_call/3,
+         handle_cast/2
+        ]).
+
+% API
+-export([start/2,
+         stop/0,
+         set_handler/1
+        ]).
+
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
+
+start(Port, Path) ->
+    Dispatch = cowboy_router:compile([
+        {'_', [{Path, ?MODULE, []}]}
+    ]),
+    {ok, _} = cowboy:start_clear(?MODULE,
+        [{port, Port}],
+        #{env => #{dispatch => Dispatch}}
+    ),
+    {ok, _} = gen_server:start_link({local, ?MODULE}, ?MODULE, [], []),
+    ok.
+
+stop() ->
+    gen_server:stop(?MODULE),
+    cowboy:stop_listener(?MODULE).
+
+set_handler(F) when is_function(F, 2) ->
+    gen_server:call(?MODULE, {set_handler, F}).
+
+%%------------------------------------------------------------------------------
+%% gen_server API
+%%------------------------------------------------------------------------------
+
+init([]) ->
+    F = fun(Req0, State) ->
+                Req = cowboy_req:reply(
+                        400,
+                        #{<<"content-type">> => <<"text/plain">>},
+                        <<"">>,
+                        Req0),
+                {ok, Req, State}
+        end,
+    {ok, F}.
+
+handle_cast(_, F) ->
+    {noreply, F}.
+
+handle_call({set_handler, F}, _From, _F) ->
+    {reply, ok, F};
+
+handle_call(get_handler, _From, F) ->
+    {reply, F, F}.
+
+%%------------------------------------------------------------------------------
+%% cowboy_server API
+%%------------------------------------------------------------------------------
+
+init(Req, State) ->
+    Handler = gen_server:call(?MODULE, get_handler),
+    Handler(Req, State).