Просмотр исходного кода

feat(authz): support http

Signed-off-by: zhanghongtong <rory-z@outlook.com>
zhanghongtong 4 лет назад
Родитель
Сommit
bb417e4498

+ 1 - 1
apps/emqx/rebar.config

@@ -15,7 +15,7 @@
     , {esockd, {git, "https://github.com/emqx/esockd", {tag, "5.8.0"}}}
     , {ekka, {git, "https://github.com/emqx/ekka", {tag, "0.10.3"}}}
     , {gen_rpc, {git, "https://github.com/emqx/gen_rpc", {tag, "2.5.1"}}}
-    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.6"}}}
+    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.10.3"}}}
     , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
     , {recon, {git, "https://github.com/ferd/recon", {tag, "2.5.1"}}}
     , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}

+ 10 - 0
apps/emqx_authz/etc/emqx_authz.conf

@@ -1,5 +1,15 @@
 emqx_authz:{
     rules: [
+       # {
+       #      type: http
+       #      config: {
+       #          url: "https://emqx.com"
+       #          headers: {
+       #              Accept: "application/json"
+       #              Content-Type: "application/json"
+       #          }
+       #      }
+       # },
        # {
        #     type: mysql
        #     config: {

+ 10 - 2
apps/emqx_authz/src/emqx_authz.erl

@@ -88,6 +88,14 @@ compile(#{topics := Topics,
           topics => NTopics
          };
 
+compile(#{principal := Principal,
+          type := http,
+          config := #{url := Url} = Config
+         } = Rule) ->
+    NConfig = maps:merge(Config, #{base_url => maps:remove(query, Url)}),
+    NRule = create_resource(Rule#{config := NConfig}),
+    NRule#{principal => compile_principal(Principal)};
+
 compile(#{principal := Principal,
           type := DB
          } = Rule) when DB =:= redis;
@@ -150,8 +158,8 @@ b2l(B) when is_binary(B) -> binary_to_list(B).
 -spec(authorize(emqx_types:clientinfo(), emqx_types:all(), emqx_topic:topic(), emqx_permission_rule:acl_result(), rules())
       -> {stop, allow} | {ok, deny}).
 authorize(#{username := Username,
-              peerhost := IpAddress
-             } = Client, PubSub, Topic, _DefaultResult, Rules) ->
+            peerhost := IpAddress
+           } = Client, PubSub, Topic, _DefaultResult, Rules) ->
     case do_authorize(Client, PubSub, Topic, Rules) of
         {matched, allow} ->
             ?LOG(info, "Client succeeded authorization: Username: ~p, IP: ~p, Topic: ~p, Permission: allow", [Username, IpAddress, Topic]),

+ 99 - 0
apps/emqx_authz/src/emqx_authz_http.erl

@@ -0,0 +1,99 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authz_http).
+
+-include("emqx_authz.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%% AuthZ Callbacks
+-export([ authorize/4
+        , description/0
+        ]).
+
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
+description() ->
+    "AuthZ with http".
+
+authorize(Client, PubSub, Topic,
+            #{resource_id := ResourceID,
+              type := http,
+              config := #{url := #{path := Path} = Url,
+                          headers := Headers,
+                          method := Method,
+                          request_timeout := RequestTimeout} = Config
+             }) ->
+    Request = case Method of
+                  get  -> 
+                      Query = maps:get(query, Url, ""),
+                      Path1 = replvar(Path ++ "?" ++ Query, PubSub, Topic, Client),
+                      {Path1, maps:to_list(Headers)};
+                  _ ->
+                      Body0 = serialize_body(
+                                maps:get('Accept', Headers, <<"application/json">>),
+                                maps:get(body, Config, #{})
+                              ),
+                      Body1 = replvar(Body0, PubSub, Topic, Client),
+                      Path1 = replvar(Path, PubSub, Topic, Client),
+                      {Path1, maps:to_list(Headers), Body1}
+              end,
+    case emqx_resource:query(ResourceID,  {Method, Request, RequestTimeout}) of
+        {ok, 204, _Headers} -> {matched, allow};
+        {ok, 200, _Headers, _Body} -> {matched, allow};
+        _ -> nomatch
+    end.
+
+query_string(Body) ->
+    query_string(maps:to_list(Body), []).
+
+query_string([], Acc) ->
+    <<$&, Str/binary>> = iolist_to_binary(lists:reverse(Acc)),
+    Str;
+query_string([{K, V} | More], Acc) ->
+    query_string(More, [["&", emqx_http_lib:uri_encode(K), "=", emqx_http_lib:uri_encode(V)] | Acc]).
+
+serialize_body(<<"application/json">>, Body) ->
+    jsx:encode(Body);
+serialize_body(<<"application/x-www-form-urlencoded">>, Body) ->
+    query_string(Body).
+
+replvar(Str0, PubSub, Topic,
+        #{username := Username,
+          clientid := Clientid,
+          peerhost := IpAddress,
+          protocol := Protocol,
+          mountpoint := Mountpoint
+         }) when is_list(Str0);
+                 is_binary(Str0) ->
+    NTopic = emqx_http_lib:uri_encode(Topic),
+    Str1 = re:replace(Str0, "%c", Clientid, [global, {return, binary}]),
+    Str2 = re:replace(Str1, "%u", Username, [global, {return, binary}]),
+    Str3 = re:replace(Str2, "%a", inet_parse:ntoa(IpAddress), [global, {return, binary}]),
+    Str4 = re:replace(Str3, "%r", bin(Protocol), [global, {return, binary}]),
+    Str5 = re:replace(Str4, "%m", Mountpoint, [global, {return, binary}]),
+    Str6 = re:replace(Str5, "%t", NTopic, [global, {return, binary}]),
+    Str7 = re:replace(Str6, "%A", bin(PubSub), [global, {return, binary}]),
+    Str7.
+
+bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
+bin(B) when is_binary(B) -> B;
+bin(L) when is_list(L) -> list_to_binary(L);
+bin(X) -> X.

+ 81 - 3
apps/emqx_authz/src/emqx_authz_schema.erl

@@ -4,18 +4,87 @@
 
 -type action() :: publish | subscribe | all.
 -type permission() :: allow | deny.
+-type url() :: emqx_http_lib:uri_map().
 
 -reflect_type([ permission/0
               , action/0
+              , url/0
               ]).
 
--export([structs/0, fields/1]).
+-typerefl_from_string({url/0, emqx_http_lib, uri_parse}).
+
+-export([ structs/0
+        , fields/1
+        ]).
 
 structs() -> ["emqx_authz"].
 
 fields("emqx_authz") ->
     [ {rules, rules()}
     ];
+fields(http) ->
+    [ {principal, principal()}
+    , {type, #{type => http}}
+    , {config, #{type => hoconsc:union([ hoconsc:ref(?MODULE, http_get)
+                                       , hoconsc:ref(?MODULE, http_post)
+                                       ])}
+      }
+    ];
+fields(http_get) ->
+    [ {url, #{type => url()}}
+    , {headers, #{type => map(),
+                  default => #{ <<"accept">> => <<"application/json">>
+                              , <<"cache-control">> => <<"no-cache">>
+                              , <<"connection">> => <<"keep-alive">>
+                              , <<"keep-alive">> => <<"timeout=5">>
+                              },
+                  converter => fun (Headers0) ->
+                                    Headers1 = maps:fold(fun(K0, V, AccIn) ->
+                                                           K1 = iolist_to_binary(string:to_lower(binary_to_list(K0))),
+                                                           maps:put(K1, V, AccIn)
+                                                        end, #{}, Headers0),
+                                    maps:merge(#{ <<"accept">> => <<"application/json">>
+                                                , <<"cache-control">> => <<"no-cache">>
+                                                , <<"connection">> => <<"keep-alive">>
+                                                , <<"keep-alive">> => <<"timeout=5">>
+                                                }, Headers1)
+                               end
+                 }
+      }
+    , {method,  #{type => get,
+                  default => get
+                 }}
+    ]  ++ proplists:delete(base_url, emqx_connector_http:fields(config));
+fields(http_post) ->
+    [ {url, #{type => url()}}
+    , {headers, #{type => map(),
+                  default => #{ <<"accept">> => <<"application/json">>
+                              , <<"cache-control">> => <<"no-cache">>
+                              , <<"connection">> => <<"keep-alive">>
+                              , <<"content-type">> => <<"application/json">>
+                              , <<"keep-alive">> => <<"timeout=5">>
+                              },
+                  converter => fun (Headers0) ->
+                                    Headers1 = maps:fold(fun(K0, V, AccIn) ->
+                                                           K1 = iolist_to_binary(string:to_lower(binary_to_list(K0))),
+                                                           maps:put(K1, V, AccIn)
+                                                        end, #{}, Headers0),
+                                    maps:merge(#{ <<"accept">> => <<"application/json">>
+                                                , <<"cache-control">> => <<"no-cache">>
+                                                , <<"connection">> => <<"keep-alive">>
+                                                , <<"content-type">> => <<"application/json">>
+                                                , <<"keep-alive">> => <<"timeout=5">>
+                                                }, Headers1)
+                               end
+                 }
+      }
+    , {method,  #{type => hoconsc:enum([post, put]),
+                  default => get}}
+    , {body, #{type => map(),
+               nullable => true
+              }
+      }
+    ]  ++ proplists:delete(base_url, emqx_connector_http:fields(config));
 fields(mongo) ->
     connector_fields(mongo) ++
     [ {collection, #{type => atom()}}
@@ -75,9 +144,10 @@ fields(eq_topic) ->
 union_array(Item) when is_list(Item) ->
     hoconsc:array(hoconsc:union(Item)).
 
-rules() -> 
+rules() ->
     #{type => union_array(
                 [ hoconsc:ref(?MODULE, simple_rule)
+                , hoconsc:ref(?MODULE, http)
                 , hoconsc:ref(?MODULE, mysql)
                 , hoconsc:ref(?MODULE, pgsql)
                 , hoconsc:ref(?MODULE, redis)
@@ -108,7 +178,15 @@ query() ->
      }.
 
 connector_fields(DB) ->
-    Mod = list_to_existing_atom(io_lib:format("~s_~s",[emqx_connector, DB])),
+    Mod0 = io_lib:format("~s_~s",[emqx_connector, DB]),
+    Mod = try
+              list_to_existing_atom(Mod0)
+          catch
+              error:badarg ->
+                  list_to_atom(Mod0);
+              Error ->
+                  erlang:error(Error)
+          end,
     [ {principal, principal()}
     , {type, #{type => DB}}
     ] ++ Mod:fields("").

+ 94 - 0
apps/emqx_authz/test/emqx_authz_http_SUITE.erl

@@ -0,0 +1,94 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authz_http_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include("emqx_authz.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("common_test/include/ct.hrl").
+
+all() ->
+    emqx_ct:all(?MODULE).
+
+groups() ->
+    [].
+
+init_per_suite(Config) ->
+    meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
+    meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
+    ok = emqx_ct_helpers:start_apps([emqx_authz], fun set_special_configs/1),
+    Config.
+
+end_per_suite(_Config) ->
+    file:delete(filename:join(emqx:get_env(plugins_etc_dir), 'authz.conf')),
+    emqx_ct_helpers:stop_apps([emqx_authz, emqx_resource]),
+    meck:unload(emqx_resource).
+
+set_special_configs(emqx) ->
+    application:set_env(emqx, allow_anonymous, true),
+    application:set_env(emqx, enable_acl_cache, false),
+    application:set_env(emqx, acl_nomatch, deny),
+    application:set_env(emqx, plugins_loaded_file,
+                        emqx_ct_helpers:deps_path(emqx, "test/loaded_plguins")),
+    ok;
+set_special_configs(emqx_authz) ->
+    Rules = [#{config =>#{
+                 url => #{host => "fake.com",
+                          path => "/",
+                          port => 443,
+                          scheme => https},
+                 headers => #{},
+                 method => get,
+                 request_timeout => 5000
+                },
+               principal => all,
+               type => http}
+            ],
+    emqx_config:put([emqx_authz], #{rules => Rules}),
+    ok;
+set_special_configs(_App) ->
+    ok.
+
+%%------------------------------------------------------------------------------
+%% Testcases
+%%------------------------------------------------------------------------------
+
+t_authz(_) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   protocol => mqtt,
+                   mountpoint => <<"fake">>
+                   },
+
+    meck:expect(emqx_resource, query, fun(_, _) -> {ok, 204, fake_headers} end),
+    ?assertEqual(allow,
+                 emqx_access_control:authorize(ClientInfo, subscribe, <<"#">>)),
+
+    meck:expect(emqx_resource, query, fun(_, _) -> {ok, 200, fake_headers, fake_body} end),
+    ?assertEqual(allow,
+                 emqx_access_control:authorize(ClientInfo, publish, <<"#">>)),
+
+
+    meck:expect(emqx_resource, query, fun(_, _) -> {error, other} end),
+    ?assertEqual(deny,
+        emqx_access_control:authorize(ClientInfo, subscribe, <<"+">>)),
+    ?assertEqual(deny,
+        emqx_access_control:authorize(ClientInfo, publish, <<"+">>)),
+    ok.
+

+ 16 - 24
apps/emqx_connector/src/emqx_connector_http.erl

@@ -28,6 +28,10 @@
         , on_health_check/2
         ]).
 
+-type url() :: emqx_http_lib:uri_map().
+-reflect_type([url/0]).
+-typerefl_from_string({url/0, emqx_http_lib, uri_parse}).
+
 -export([ structs/0
         , fields/1
         , validations/0]).
@@ -53,7 +57,6 @@ fields(config) ->
     , {connect_timeout, fun connect_timeout/1}
     , {max_retries,     fun max_retries/1}
     , {retry_interval,  fun retry_interval/1}
-    , {keepalive,       fun keepalive/1}
     , {pool_type,       fun pool_type/1}
     , {pool_size,       fun pool_size/1}
     , {ssl_opts,        #{type => hoconsc:ref(?MODULE, ssl_opts),
@@ -70,9 +73,12 @@ fields(ssl_opts) ->
 validations() ->
     [ {check_ssl_opts, fun check_ssl_opts/1} ].
 
-base_url(type) -> binary();
+base_url(type) -> url();
 base_url(nullable) -> false;
-base_url(validate) -> [fun check_base_url/1];
+base_url(validate) -> fun (#{query := _Query}) ->
+                              {error, "There must be no query in the base_url"};
+                          (_) -> ok
+                      end;
 base_url(_) -> undefined.
 
 connect_timeout(type) -> connect_timeout();
@@ -87,10 +93,6 @@ retry_interval(type) -> non_neg_integer();
 retry_interval(default) -> 1000;
 retry_interval(_) -> undefined.
 
-keepalive(type) -> non_neg_integer();
-keepalive(default) -> 5000;
-keepalive(_) -> undefined.
-
 pool_type(type) -> pool_type();
 pool_type(default) -> random;
 pool_type(_) -> undefined.
@@ -117,18 +119,16 @@ verify(default) -> false;
 verify(_) -> undefined.
 
 %% ===================================================================
-on_start(InstId, #{url := URL,
+on_start(InstId, #{base_url := #{scheme := Scheme,
+                                 host := Host,
+                                 port := Port,
+                                 path := BasePath},
                    connect_timeout := ConnectTimeout,
                    max_retries := MaxRetries,
                    retry_interval := RetryInterval,
-                   keepalive := Keepalive,
                    pool_type := PoolType,
                    pool_size := PoolSize} = Config) ->
     logger:info("starting http connector: ~p, config: ~p", [InstId, Config]),
-    {ok, #{scheme := Scheme,
-           host := Host,
-           port := Port,
-           path := BasePath}} = emqx_http_lib:uri_parse(URL),
     {Transport, TransportOpts} = case Scheme of
                                      http ->
                                          {tcp, []};
@@ -143,7 +143,7 @@ on_start(InstId, #{url := URL,
                , {connect_timeout, ConnectTimeout}
                , {retry, MaxRetries}
                , {retry_timeout, RetryInterval}
-               , {keepalive, Keepalive}
+               , {keepalive, 5000}
                , {pool_type, PoolType}
                , {pool_size, PoolSize}
                , {transport, Transport}
@@ -192,19 +192,11 @@ on_health_check(_InstId, #{host := Host, port := Port} = State) ->
 %% Internal functions
 %%--------------------------------------------------------------------
 
-check_base_url(URL) ->
-    case emqx_http_lib:uri_parse(URL) of
-        {error, _} -> false;
-        {ok, #{query := _}} -> false;
-        _ -> true
-    end.
-
 check_ssl_opts(Conf) ->
     check_ssl_opts("base_url", Conf).
 
 check_ssl_opts(URLFrom, Conf) ->
-    URL = hocon_schema:get_value(URLFrom, Conf),
-    {ok, #{scheme := Scheme}} = emqx_http_lib:uri_parse(URL),
+    #{schema := Scheme} = hocon_schema:get_value(URLFrom, Conf),
     SSLOpts = hocon_schema:get_value("ssl_opts", Conf),
     case {Scheme, maps:size(SSLOpts)} of
         {http, 0} -> true;
@@ -216,4 +208,4 @@ check_ssl_opts(URLFrom, Conf) ->
 update_path(BasePath, {Path, Headers}) ->
     {filename:join(BasePath, Path), Headers};
 update_path(BasePath, {Path, Headers, Body}) ->
-    {filename:join(BasePath, Path), Headers, Body}.
+    {filename:join(BasePath, Path), Headers, Body}.

+ 2 - 2
rebar.config

@@ -60,8 +60,8 @@
     , {observer_cli, "1.6.1"} % NOTE: depends on recon 2.5.1
     , {getopt, "1.0.1"}
     , {snabbkaffe, {git, "https://github.com/kafka4beam/snabbkaffe.git", {tag, "0.13.0"}}}
-    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.9.6"}}}
-    , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.2.1"}}}
+    , {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.10.3"}}}
+    , {emqx_http_lib, {git, "https://github.com/emqx/emqx_http_lib.git", {tag, "0.3.0"}}}
     , {esasl, {git, "https://github.com/emqx/esasl", {tag, "0.1.0"}}}
     ]}.