Procházet zdrojové kódy

Merge pull request #6446 from savonarola/test-authz-more

Test authz more
Ilya Averyanov před 4 roky
rodič
revize
7cceecc11e

+ 0 - 3
.ci/docker-compose-file/Makefile.local

@@ -16,10 +16,8 @@ up:
 		REDIS_TAG=6 \
 		MONGO_TAG=5 \
 		PGSQL_TAG=13 \
-		LDAP_TAG=2.4.50 \
 	docker-compose \
 		-f .ci/docker-compose-file/docker-compose.yaml \
-		-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
 		-f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \
 		-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
 		-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \
@@ -29,7 +27,6 @@ up:
 down:
 	docker-compose \
 		-f .ci/docker-compose-file/docker-compose.yaml \
-		-f .ci/docker-compose-file/docker-compose-ldap-tcp.yaml \
 		-f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \
 		-f .ci/docker-compose-file/docker-compose-mysql-tcp.yaml \
 		-f .ci/docker-compose-file/docker-compose-pgsql-tcp.yaml \

+ 1 - 1
apps/emqx_authz/include/emqx_authz.hrl

@@ -21,7 +21,7 @@
 
 -define(CONF_KEY_PATH, [authorization, sources]).
 
--define(RE_PLACEHOLDER, "\\$\\{[a-z0-9\\-]+\\}").
+-define(RE_PLACEHOLDER, "\\$\\{[a-z0-9_]+\\}").
 
 -define(USERNAME_RULES_EXAMPLE, #{username => user1,
                                   rules => [ #{topic => <<"test/toopic/1">>,

+ 2 - 2
apps/emqx_authz/src/emqx_authz.erl

@@ -217,10 +217,10 @@ do_post_update({{?CMD_DELETE, Type}, _Source}, _NewSources) ->
     ok = ensure_resource_deleted(OldSource),
     ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Front ++ Rear]}, -1),
     ok = emqx_authz_cache:drain_cache();
-do_post_update(_, NewSources) ->
+do_post_update({?CMD_REPLACE, Sources}, _NewSources) ->
     %% overwrite the entire config!
     OldInitedSources = lookup(),
-    InitedSources = init_sources(NewSources),
+    InitedSources = init_sources(check_sources(Sources)),
     ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [InitedSources]}, -1),
     lists:foreach(fun ensure_resource_deleted/1, OldInitedSources),
     ok = emqx_authz_cache:drain_cache().

+ 1 - 1
apps/emqx_authz/src/emqx_authz_mongodb.erl

@@ -97,7 +97,7 @@ replvar(Selector, #{clientid := Clientid,
                                  , bin(Clientid), [global, {return, binary}]),
                   V2 = re:replace( V1, emqx_authz:ph_to_re(?PH_S_USERNAME)
                                  , bin(Username), [global, {return, binary}]),
-                  V3 = re:replace( V2, emqx_authz:ph_to_re(?PH_S_HOST)
+                  V3 = re:replace( V2, emqx_authz:ph_to_re(?PH_S_PEERHOST)
                                  , inet_parse:ntoa(IpAddress), [global, {return, binary}]),
                   maps:put(K, V3, AccIn);
               InFun(K, V, AccIn) -> maps:put(K, V, AccIn)

+ 11 - 13
apps/emqx_authz/src/emqx_authz_mysql.erl

@@ -53,17 +53,6 @@ dry_run(Source) ->
 destroy(#{annotations := #{id := Id}}) ->
     ok = emqx_resource:remove(Id).
 
-parse_query(undefined) ->
-    undefined;
-parse_query(Sql) ->
-    case re:run(Sql, ?RE_PLACEHOLDER, [global, {capture, all, list}]) of
-        {match, Variables} ->
-            Params = [Var || [Var] <- Variables],
-            {re:replace(Sql, ?RE_PLACEHOLDER, "?", [global, {return, list}]), Params};
-        nomatch ->
-            {Sql, []}
-    end.
-
 authorize(Client, PubSub, Topic,
             #{annotations := #{id := ResourceID,
                                query := {Query, Params}
@@ -80,6 +69,15 @@ authorize(Client, PubSub, Topic,
             nomatch
     end.
 
+parse_query(Sql) ->
+    case re:run(Sql, ?RE_PLACEHOLDER, [global, {capture, all, list}]) of
+        {match, Variables} ->
+            Params = [Var || [Var] <- Variables],
+            {re:replace(Sql, ?RE_PLACEHOLDER, "?", [global, {return, list}]), Params};
+        nomatch ->
+            {Sql, []}
+    end.
+
 do_authorize(_Client, _PubSub, _Topic, _Columns, []) ->
     nomatch;
 do_authorize(Client, PubSub, Topic, Columns, [Row | Tail]) ->
@@ -110,8 +108,8 @@ replvar([], _ClientInfo, Acc) ->
 
 replvar([?PH_S_USERNAME | Params], ClientInfo, Acc) ->
     replvar(Params, ClientInfo, [safe_get(username, ClientInfo) | Acc]);
-replvar([?PH_S_CLIENTID | Params], ClientInfo = #{clientid := ClientId}, Acc) ->
-    replvar(Params, ClientInfo, [ClientId | Acc]);
+replvar([?PH_S_CLIENTID | Params], ClientInfo = #{clientid := _ClientId}, Acc) ->
+    replvar(Params, ClientInfo, [safe_get(clientid, ClientInfo) | Acc]);
 replvar([?PH_S_PEERHOST | Params], ClientInfo = #{peerhost := IpAddr}, Acc) ->
     replvar(Params, ClientInfo, [inet_parse:ntoa(IpAddr) | Acc]);
 replvar([?PH_S_CERT_CN_NAME | Params], ClientInfo, Acc) ->

+ 0 - 2
apps/emqx_authz/src/emqx_authz_postgresql.erl

@@ -53,8 +53,6 @@ destroy(#{annotations := #{id := Id}}) ->
 dry_run(Source) ->
     emqx_resource:create_dry_run(emqx_connector_pgsql, Source).
 
-parse_query(undefined) ->
-    undefined;
 parse_query(Sql) ->
     case re:run(Sql, ?RE_PLACEHOLDER, [global, {capture, all, list}]) of
         {match, Capured} ->

+ 6 - 3
apps/emqx_authz/src/emqx_authz_redis.erl

@@ -70,9 +70,10 @@ authorize(Client, PubSub, Topic,
 do_authorize(_Client, _PubSub, _Topic, []) ->
     nomatch;
 do_authorize(Client, PubSub, Topic, [TopicFilter, Action | Tail]) ->
-    case emqx_authz_rule:match(Client, PubSub, Topic,
-                               emqx_authz_rule:compile({allow, all, Action, [TopicFilter]})
-                              )of
+    case emqx_authz_rule:match(
+           Client, PubSub, Topic,
+           emqx_authz_rule:compile({allow, all, Action, [TopicFilter]})
+          ) of
         {matched, Permission} -> {matched, Permission};
         nomatch -> do_authorize(Client, PubSub, Topic, Tail)
     end.
@@ -81,6 +82,8 @@ replvar(Cmd, Client = #{cn := CN}) ->
     replvar(repl(Cmd, ?PH_S_CERT_CN_NAME, CN), maps:remove(cn, Client));
 replvar(Cmd, Client = #{dn := DN}) ->
     replvar(repl(Cmd, ?PH_S_CERT_SUBJECT, DN), maps:remove(dn, Client));
+replvar(Cmd, Client = #{peerhost := IpAddr}) ->
+    replvar(repl(Cmd, ?PH_S_PEERHOST, inet_parse:ntoa(IpAddr)), maps:remove(peerhost, Client));
 replvar(Cmd, Client = #{clientid := ClientId}) ->
     replvar(repl(Cmd, ?PH_S_CLIENTID, ClientId), maps:remove(clientid, Client));
 replvar(Cmd, Client = #{username := Username}) ->

+ 195 - 90
apps/emqx_authz/test/emqx_authz_mongodb_SUITE.erl

@@ -23,6 +23,10 @@
 -include_lib("common_test/include/ct.hrl").
 -include_lib("emqx/include/emqx_placeholder.hrl").
 
+-define(MONGO_HOST, "mongo").
+-define(MONGO_PORT, 27017).
+-define(MONGO_CLIENT, 'emqx_authz_mongo_SUITE_client').
+
 all() ->
     emqx_common_test_helpers:all(?MODULE).
 
@@ -30,105 +34,206 @@ groups() ->
     [].
 
 init_per_suite(Config) ->
-    meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end),
-    meck:expect(emqx_resource, remove, fun(_) -> ok end ),
-
-    ok = emqx_common_test_helpers:start_apps(
-           [emqx_conf, emqx_authz],
-           fun set_special_configs/1
-          ),
-
-    Rules = [#{<<"type">> => <<"mongodb">>,
-               <<"mongo_type">> => <<"single">>,
-               <<"server">> => <<"127.0.0.1:27017">>,
-               <<"pool_size">> => 1,
-               <<"database">> => <<"mqtt">>,
-               <<"ssl">> => #{<<"enable">> => false},
-               <<"collection">> => <<"fake">>,
-               <<"selector">> => #{<<"a">> => <<"b">>}
-              }],
-    {ok, _} = emqx_authz:update(replace, Rules),
-    Config.
+    case emqx_authz_test_lib:is_tcp_server_available(?MONGO_HOST, ?MONGO_PORT) of
+        true ->
+            ok = emqx_common_test_helpers:start_apps(
+                   [emqx_conf, emqx_authz],
+                   fun set_special_configs/1
+                  ),
+            ok = start_apps([emqx_resource, emqx_connector]),
+            Config;
+        false ->
+            {skip, no_mongo}
+    end.
 
 end_per_suite(_Config) ->
-    {ok, _} = emqx:update_config(
-                [authorization],
-                #{<<"no_match">> => <<"allow">>,
-                  <<"cache">> => #{<<"enable">> => <<"true">>},
-                  <<"sources">> => []}),
-    emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
-    meck:unload(emqx_resource),
-    ok.
+    ok = emqx_authz_test_lib:restore_authorizers(),
+    ok = stop_apps([emqx_resource, emqx_connector]),
+    ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
 
 set_special_configs(emqx_authz) ->
-    {ok, _} = emqx:update_config([authorization, cache, enable], false),
-    {ok, _} = emqx:update_config([authorization, no_match], deny),
-    {ok, _} = emqx:update_config([authorization, sources], []),
-    ok;
-set_special_configs(_App) ->
+    ok = emqx_authz_test_lib:reset_authorizers();
+
+set_special_configs(_) ->
     ok.
 
--define(SOURCE1,[#{<<"topics">> => [<<"#">>],
-                 <<"permission">> => <<"deny">>,
-                 <<"action">> => <<"all">>}]).
--define(SOURCE2,[#{<<"topics">> => [<<"eq #">>],
-                 <<"permission">> => <<"allow">>,
-                 <<"action">> => <<"all">>}]).
--define(SOURCE3,[#{<<"topics">> => [<<"test/", ?PH_CLIENTID/binary>>],
-                 <<"permission">> => <<"allow">>,
-                 <<"action">> => <<"subscribe">>}]).
--define(SOURCE4,[#{<<"topics">> => [<<"test/", ?PH_USERNAME/binary>>],
-                 <<"permission">> => <<"allow">>,
-                 <<"action">> => <<"publish">>}]).
+init_per_testcase(_TestCase, Config) ->
+    {ok, _} = mc_worker_api:connect(mongo_config()),
+    ok = emqx_authz_test_lib:reset_authorizers(),
+    Config.
+
+end_per_testcase(_TestCase, _Config) ->
+    ok = reset_samples(),
+    ok = mc_worker_api:disconnect(?MONGO_CLIENT).
 
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_authz(_) ->
-    ClientInfo1 = #{clientid => <<"test">>,
-                    username => <<"test">>,
-                    peerhost => {127,0,0,1},
-                    zone => default,
-                    listener => {tcp, default}
-                   },
-    ClientInfo2 = #{clientid => <<"test_clientid">>,
-                    username => <<"test_username">>,
-                    peerhost => {192,168,0,10},
-                    zone => default,
-                    listener => {tcp, default}
-                   },
-    ClientInfo3 = #{clientid => <<"test_clientid">>,
-                    username => <<"fake_username">>,
-                    peerhost => {127,0,0,1},
-                    zone => default,
-                    listener => {tcp, default}
-                   },
-
-    meck:expect(emqx_resource, query, fun(_, _) -> [] end),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), % nomatch
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)), % nomatch
-
-    meck:expect(emqx_resource, query, fun(_, _) -> ?SOURCE1 ++ ?SOURCE2 end),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"+">>)),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"+">>)),
-
-    meck:expect(emqx_resource, query, fun(_, _) -> ?SOURCE2 ++ ?SOURCE1 end),
-    ?assertEqual(allow, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"+">>)),
-
-    meck:expect(emqx_resource, query, fun(_, _) -> ?SOURCE3 ++ ?SOURCE4 end),
-    ?assertEqual(allow, emqx_access_control:authorize(
-                          ClientInfo2, subscribe, <<"test/test_clientid">>)),
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo2, publish,   <<"test/test_clientid">>)),
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo2, subscribe, <<"test/test_username">>)),
-    ?assertEqual(allow, emqx_access_control:authorize(
-                          ClientInfo2, publish,   <<"test/test_username">>)),
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo3, subscribe, <<"test">>)), % nomatch
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo3, publish,   <<"test">>)), % nomatch
+t_topic_rules(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ok = emqx_authz_test_lib:test_no_topic_rules(ClientInfo, fun setup_client_samples/2),
+
+    ok = emqx_authz_test_lib:test_allow_topic_rules(ClientInfo, fun setup_client_samples/2),
+
+    ok = emqx_authz_test_lib:test_deny_topic_rules(ClientInfo, fun setup_client_samples/2).
+
+t_complex_selector(_) ->
+    %% atom and string values also supported
+    ClientInfo = #{clientid => clientid,
+                   username => "username",
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    Samples = [#{<<"x">> => #{<<"u">> => <<"username">>,
+                              <<"c">> => [#{<<"c">> => <<"clientid">>}],
+                              <<"y">> => 1},
+                 <<"permission">> => <<"allow">>,
+                 <<"action">> => <<"publish">>,
+                 <<"topics">> => [<<"t">>]
+                }],
+
+    ok = setup_samples(Samples),
+    ok = setup_config(
+           #{<<"selector">> => #{<<"x">> => #{<<"u">> => <<"${username}">>,
+                                              <<"c">> => [#{<<"c">> => <<"${clientid}">>}],
+                                              <<"y">> => 1}
+                                }
+            }),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, publish, <<"t">>}]).
+
+t_mongo_error(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ok = setup_samples([]),
+    ok = setup_config(
+           #{<<"selector">> => #{<<"$badoperator">> => <<"$badoperator">>}}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{deny, publish, <<"t">>}]).
+
+t_lookups(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   cn => <<"cn">>,
+                   dn => <<"dn">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ByClientid = #{<<"clientid">> => <<"clientid">>,
+                   <<"topics">> => [<<"a">>],
+                   <<"action">> => <<"all">>,
+                   <<"permission">> => <<"allow">>},
+
+    ok = setup_samples([ByClientid]),
+    ok = setup_config(
+           #{<<"selector">> => #{<<"clientid">> => <<"${clientid}">>}}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+    ByPeerhost = #{<<"peerhost">> => <<"127.0.0.1">>,
+                   <<"topics">> => [<<"a">>],
+                   <<"action">> => <<"all">>,
+                   <<"permission">> => <<"allow">>},
+
+    ok = setup_samples([ByPeerhost]),
+    ok = setup_config(
+           #{<<"selector">> => #{<<"peerhost">> => <<"${peerhost}">>}}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]).
+
+%%------------------------------------------------------------------------------
+%% Helpers
+%%------------------------------------------------------------------------------
+
+populate_records(AclRecords, AdditionalData) ->
+    [maps:merge(Record, AdditionalData) || Record <- AclRecords].
+
+setup_samples(AclRecords) ->
+    ok = reset_samples(),
+    {{true, _}, _} = mc_worker_api:insert(?MONGO_CLIENT, <<"acl">>, AclRecords),
     ok.
+
+setup_client_samples(ClientInfo, Samples) ->
+    #{username := Username} = ClientInfo,
+    Records = lists:map(
+                fun(Sample) ->
+                        #{topics := Topics,
+                          permission := Permission,
+                          action := Action} = Sample,
+
+                        #{<<"topics">> => Topics,
+                          <<"permission">> => Permission,
+                          <<"action">> => Action,
+                          <<"username">> => Username}
+                end,
+                Samples),
+    setup_samples(Records),
+    setup_config(#{<<"selector">> => #{<<"username">> => <<"${username}">>}}).
+
+reset_samples() ->
+    {true, _} = mc_worker_api:delete(?MONGO_CLIENT, <<"acl">>, #{}),
+    ok.
+
+setup_config(SpecialParams) ->
+    emqx_authz_test_lib:setup_config(
+      raw_mongo_authz_config(),
+      SpecialParams).
+
+raw_mongo_authz_config() ->
+    #{
+        <<"type">> => <<"mongodb">>,
+        <<"enable">> => <<"true">>,
+
+        <<"mongo_type">> => <<"single">>,
+        <<"database">> => <<"mqtt">>,
+        <<"collection">> => <<"acl">>,
+        <<"server">> => mongo_server(),
+
+        <<"selector">> => #{<<"username">> => <<"${username}">>}
+    }.
+
+mongo_server() ->
+    iolist_to_binary(
+      io_lib:format(
+        "~s:~b",
+        [?MONGO_HOST, ?MONGO_PORT])).
+
+mongo_config() ->
+    [
+     {database, <<"mqtt">>},
+     {host, ?MONGO_HOST},
+     {port, ?MONGO_PORT},
+     {register, ?MONGO_CLIENT}
+    ].
+
+start_apps(Apps) ->
+    lists:foreach(fun application:ensure_all_started/1, Apps).
+
+stop_apps(Apps) ->
+    lists:foreach(fun application:stop/1, Apps).

+ 254 - 90
apps/emqx_authz/test/emqx_authz_mysql_SUITE.erl

@@ -21,9 +21,11 @@
 -include("emqx_authz.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
--include_lib("emqx/include/emqx_placeholder.hrl").
 
--define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
+
+-define(MYSQL_HOST, "mysql").
+-define(MYSQL_PORT, 3306).
+-define(MYSQL_RESOURCE, <<"emqx_authz_mysql_SUITE">>).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -32,101 +34,263 @@ groups() ->
     [].
 
 init_per_suite(Config) ->
-    meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
-    meck:expect(emqx_resource, remove, fun(_) -> ok end ),
-
-    ok = emqx_common_test_helpers:start_apps(
-           [emqx_conf, emqx_authz],
-           fun set_special_configs/1),
-
-    Rules = [#{<<"type">> => <<"mysql">>,
-               <<"server">> => <<"127.0.0.1:27017">>,
-               <<"pool_size">> => 1,
-               <<"database">> => <<"mqtt">>,
-               <<"username">> => <<"xx">>,
-               <<"password">> => <<"ee">>,
-               <<"auto_reconnect">> => true,
-               <<"ssl">> => #{<<"enable">> => false},
-               <<"query">> => <<"abcb">>
-              }],
-    {ok, _} = emqx_authz:update(replace, Rules),
-    Config.
+    case emqx_authn_test_lib:is_tcp_server_available(?MYSQL_HOST, ?MYSQL_PORT) of
+        true ->
+            ok = emqx_common_test_helpers:start_apps(
+                   [emqx_conf, emqx_authz],
+                   fun set_special_configs/1
+                  ),
+            ok = start_apps([emqx_resource, emqx_connector]),
+            {ok, _} = emqx_resource:create_local(
+              ?MYSQL_RESOURCE,
+              emqx_connector_mysql,
+              mysql_config()),
+            Config;
+        false ->
+            {skip, no_mysql}
+    end.
 
 end_per_suite(_Config) ->
-    {ok, _} = emqx:update_config(
-                [authorization],
-                #{<<"no_match">> => <<"allow">>,
-                  <<"cache">> => #{<<"enable">> => <<"true">>},
-                  <<"sources">> => []}),
-    emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
-    meck:unload(emqx_resource),
-    ok.
+    ok = emqx_authz_test_lib:restore_authorizers(),
+    ok = emqx_resource:remove_local(?MYSQL_RESOURCE),
+    ok = stop_apps([emqx_resource, emqx_connector]),
+    ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
+
+init_per_testcase(Config) ->
+    ok = emqx_authz_test_lib:reset_authorizers(),
+    Config.
 
 set_special_configs(emqx_authz) ->
-    {ok, _} = emqx:update_config([authorization, cache, enable], false),
-    {ok, _} = emqx:update_config([authorization, no_match], deny),
-    {ok, _} = emqx:update_config([authorization, sources], []),
-    ok;
-set_special_configs(_App) ->
-    ok.
+    ok = emqx_authz_test_lib:reset_authorizers();
 
--define(COLUMNS, [ <<"action">>
-                 , <<"permission">>
-                 , <<"topic">>
-                 ]).
--define(SOURCE1, [[<<"all">>, <<"deny">>, <<"#">>]]).
--define(SOURCE2, [[<<"all">>, <<"allow">>, <<"eq #">>]]).
--define(SOURCE3, [[<<"subscribe">>, <<"allow">>, <<"test/", ?PH_CLIENTID/binary>>]]).
--define(SOURCE4, [[<<"publish">>, <<"allow">>, <<"test/", ?PH_USERNAME/binary>>]]).
+set_special_configs(_) ->
+    ok.
 
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_authz(_) ->
-    ClientInfo1 = #{clientid => <<"test">>,
-                    username => <<"test">>,
-                    peerhost => {127,0,0,1},
-                    zone => default,
-                    listener => {tcp, default}
-                   },
-    ClientInfo2 = #{clientid => <<"test_clientid">>,
-                    username => <<"test_username">>,
-                    peerhost => {192,168,0,10},
-                    zone => default,
-                    listener => {tcp, default}
-                   },
-    ClientInfo3 = #{clientid => <<"test_clientid">>,
-                    username => <<"fake_username">>,
-                    peerhost => {127,0,0,1},
-                    zone => default,
-                    listener => {tcp, default}
-                   },
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, []} end),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), % nomatch
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)), % nomatch
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE1 ++ ?SOURCE2} end),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"+">>)),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"+">>)),
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE2 ++ ?SOURCE1} end),
-    ?assertEqual(allow, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"+">>)),
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE3 ++ ?SOURCE4} end),
-    ?assertEqual(allow, emqx_access_control:authorize(
-                          ClientInfo2, subscribe, <<"test/test_clientid">>)),
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo2, publish,   <<"test/test_clientid">>)),
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo2, subscribe, <<"test/test_username">>)),
-    ?assertEqual(allow, emqx_access_control:authorize(
-                          ClientInfo2, publish,   <<"test/test_username">>)),
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo3, subscribe, <<"test">>)), % nomatch
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo3, publish,   <<"test">>)), % nomatch
-    ok.
+t_topic_rules(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ok = emqx_authz_test_lib:test_no_topic_rules(ClientInfo, fun setup_client_samples/2),
+
+    ok = emqx_authz_test_lib:test_allow_topic_rules(ClientInfo, fun setup_client_samples/2),
+
+    ok = emqx_authz_test_lib:test_deny_topic_rules(ClientInfo, fun setup_client_samples/2).
+
+
+t_lookups(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   cn => <<"cn">>,
+                   dn => <<"dn">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    %% by clientid
+
+    ok = init_table(),
+    ok = q(<<"INSERT INTO acl(clientid, topic, permission, action)"
+             "VALUES(?, ?, ?, ?)">>,
+           [<<"clientid">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE clientid = ${clientid}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+    %% by peerhost
+
+    ok = init_table(),
+    ok = q(<<"INSERT INTO acl(peerhost, topic, permission, action)"
+             "VALUES(?, ?, ?, ?)">>,
+           [<<"127.0.0.1">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE peerhost = ${peerhost}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+    %% by cn
+
+    ok = init_table(),
+    ok = q(<<"INSERT INTO acl(cn, topic, permission, action)"
+             "VALUES(?, ?, ?, ?)">>,
+           [<<"cn">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE cn = ${cert_common_name}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+    %% by dn
+
+    ok = init_table(),
+    ok = q(<<"INSERT INTO acl(dn, topic, permission, action)"
+             "VALUES(?, ?, ?, ?)">>,
+           [<<"dn">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE dn = ${cert_subject}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]).
+
+t_mysql_error(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ok = setup_config(
+      #{<<"query">> => <<"SOME INVALID STATEMENT">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{deny, subscribe, <<"a">>}]).
+
+
+t_create_invalid(_Config) ->
+    BadConfig = maps:merge(
+                  raw_mysql_authz_config(),
+                  #{<<"server">> => <<"255.255.255.255:33333">>}),
+    {error, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]),
+
+    [] = emqx_authz:lookup().
+
+t_nonbinary_values(_Config) ->
+    ClientInfo = #{clientid => clientid,
+                   username => "username",
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+
+    ok = init_table(),
+    ok = q(<<"INSERT INTO acl(clientid, username, topic, permission, action)"
+             "VALUES(?, ?, ?, ?, ?)">>,
+           [<<"clientid">>, <<"username">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE clientid = ${clientid} AND username = ${username}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]).
+
+%%------------------------------------------------------------------------------
+%% Helpers
+%%------------------------------------------------------------------------------
+
+raw_mysql_authz_config() ->
+    #{
+        <<"enable">> => <<"true">>,
+
+        <<"type">> => <<"mysql">>,
+        <<"database">> => <<"mqtt">>,
+        <<"username">> => <<"root">>,
+        <<"password">> => <<"public">>,
+
+        <<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE username = ${username}">>,
+
+        <<"server">> => mysql_server()
+    }.
+
+q(Sql) ->
+    emqx_resource:query(
+      ?MYSQL_RESOURCE,
+      {sql, Sql}).
+
+q(Sql, Params) ->
+    emqx_resource:query(
+      ?MYSQL_RESOURCE,
+      {sql, Sql, Params}).
+
+init_table() ->
+    ok = drop_table(),
+    ok = q("CREATE TABLE acl(
+                       username VARCHAR(255),
+                       clientid VARCHAR(255),
+                       peerhost VARCHAR(255),
+                       cn VARCHAR(255),
+                       dn VARCHAR(255),
+                       topic VARCHAR(255),
+                       permission VARCHAR(255),
+                       action VARCHAR(255))").
+
+drop_table() ->
+    ok = q("DROP TABLE IF EXISTS acl").
+
+setup_client_samples(ClientInfo, Samples) ->
+    #{username := Username} = ClientInfo,
+    ok = init_table(),
+    ok = lists:foreach(
+           fun(#{topics := Topics, permission := Permission, action := Action}) ->
+                   lists:foreach(
+                     fun(Topic) ->
+                             q(<<"INSERT INTO acl(username, topic, permission, action)"
+                                 "VALUES(?, ?, ?, ?)">>,
+                               [Username, Topic, Permission, Action])
+                     end,
+                     Topics)
+           end,
+           Samples),
+    setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE username = ${username}">>}).
+
+setup_config(SpecialParams) ->
+    emqx_authz_test_lib:setup_config(
+      raw_mysql_authz_config(),
+      SpecialParams).
+
+mysql_server() ->
+    iolist_to_binary(
+      io_lib:format(
+        "~s:~b",
+        [?MYSQL_HOST, ?MYSQL_PORT])).
+
+mysql_config() ->
+    #{auto_reconnect => true,
+      database => <<"mqtt">>,
+      username => <<"root">>,
+      password => <<"public">>,
+      pool_size => 8,
+      server => {?MYSQL_HOST, ?MYSQL_PORT},
+      ssl => #{enable => false}
+     }.
+
+start_apps(Apps) ->
+    lists:foreach(fun application:ensure_all_started/1, Apps).
+
+stop_apps(Apps) ->
+    lists:foreach(fun application:stop/1, Apps).

+ 257 - 88
apps/emqx_authz/test/emqx_authz_postgresql_SUITE.erl

@@ -21,7 +21,11 @@
 -include("emqx_authz.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
--include_lib("emqx/include/emqx_placeholder.hrl").
+
+
+-define(PGSQL_HOST, "pgsql").
+-define(PGSQL_PORT, 5432).
+-define(PGSQL_RESOURCE, <<"emqx_authz_pgsql_SUITE">>).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -30,101 +34,266 @@ groups() ->
     [].
 
 init_per_suite(Config) ->
-    meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
-    meck:expect(emqx_resource, remove, fun(_) -> ok end ),
-
-   ok = emqx_common_test_helpers:start_apps(
-           [emqx_conf, emqx_authz],
-           fun set_special_configs/1),
-
-    Rules = [#{<<"type">> => <<"postgresql">>,
-               <<"server">> => <<"127.0.0.1:27017">>,
-               <<"pool_size">> => 1,
-               <<"database">> => <<"mqtt">>,
-               <<"username">> => <<"xx">>,
-               <<"password">> => <<"ee">>,
-               <<"auto_reconnect">> => true,
-               <<"ssl">> => #{<<"enable">> => false},
-               <<"query">> => <<"abcb">>
-              }],
-    {ok, _} = emqx_authz:update(replace, Rules),
-    Config.
+    case emqx_authn_test_lib:is_tcp_server_available(?PGSQL_HOST, ?PGSQL_PORT) of
+        true ->
+            ok = emqx_common_test_helpers:start_apps(
+                   [emqx_conf, emqx_authz],
+                   fun set_special_configs/1
+                  ),
+            ok = start_apps([emqx_resource, emqx_connector]),
+            {ok, _} = emqx_resource:create_local(
+              ?PGSQL_RESOURCE,
+              emqx_connector_pgsql,
+              pgsql_config()),
+            Config;
+        false ->
+            {skip, no_pgsql}
+    end.
 
 end_per_suite(_Config) ->
-    {ok, _} = emqx:update_config(
-                [authorization],
-                #{<<"no_match">> => <<"allow">>,
-                  <<"cache">> => #{<<"enable">> => <<"true">>},
-                  <<"sources">> => []}),
-    emqx_common_test_helpers:stop_apps([emqx_authz, emqx_conf]),
-    meck:unload(emqx_resource),
-    ok.
+    ok = emqx_authz_test_lib:restore_authorizers(),
+    ok = emqx_resource:remove_local(?PGSQL_RESOURCE),
+    ok = stop_apps([emqx_resource, emqx_connector]),
+    ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
+
+init_per_testcase(Config) ->
+    ok = emqx_authz_test_lib:reset_authorizers(),
+    Config.
 
 set_special_configs(emqx_authz) ->
-    {ok, _} = emqx:update_config([authorization, cache, enable], false),
-    {ok, _} = emqx:update_config([authorization, no_match], deny),
-    {ok, _} = emqx:update_config([authorization, sources], []),
-    ok;
-set_special_configs(_App) ->
-    ok.
+    ok = emqx_authz_test_lib:reset_authorizers();
 
--define(COLUMNS, [ {column, <<"action">>, meck, meck, meck, meck, meck, meck, meck}
-                 , {column, <<"permission">>, meck, meck, meck, meck, meck, meck, meck}
-                 , {column, <<"topic">>, meck, meck, meck, meck, meck, meck, meck}
-                 ]).
--define(SOURCE1, [{<<"all">>, <<"deny">>, <<"#">>}]).
--define(SOURCE2, [{<<"all">>, <<"allow">>, <<"eq #">>}]).
--define(SOURCE3, [{<<"subscribe">>, <<"allow">>, <<"test/", ?PH_CLIENTID/binary>>}]).
--define(SOURCE4, [{<<"publish">>, <<"allow">>, <<"test/", ?PH_USERNAME/binary>>}]).
+set_special_configs(_) ->
+    ok.
 
 %%------------------------------------------------------------------------------
 %% Testcases
 %%------------------------------------------------------------------------------
 
-t_authz(_) ->
-    ClientInfo1 = #{clientid => <<"test">>,
-                    username => <<"test">>,
-                    peerhost => {127,0,0,1},
-                    zone => default,
-                    listener => {tcp, default}
-                   },
-    ClientInfo2 = #{clientid => <<"test_clientid">>,
-                    username => <<"test_username">>,
-                    peerhost => {192,168,0,10},
-                    zone => default,
-                    listener => {tcp, default}
-                   },
-    ClientInfo3 = #{clientid => <<"test_clientid">>,
-                    username => <<"fake_username">>,
-                    peerhost => {127,0,0,1},
-                    zone => default,
-                    listener => {tcp, default}
-                   },
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, []} end),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)), % nomatch
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"#">>)), % nomatch
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE1 ++ ?SOURCE2} end),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, subscribe, <<"+">>)),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo1, publish, <<"+">>)),
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE2 ++ ?SOURCE1} end),
-    ?assertEqual(allow, emqx_access_control:authorize(ClientInfo1, subscribe, <<"#">>)),
-    ?assertEqual(deny, emqx_access_control:authorize(ClientInfo2, subscribe, <<"+">>)),
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?COLUMNS, ?SOURCE3 ++ ?SOURCE4} end),
-    ?assertEqual(allow, emqx_access_control:authorize(
-                          ClientInfo2, subscribe, <<"test/test_clientid">>)),
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo2, publish,   <<"test/test_clientid">>)),
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo2, subscribe, <<"test/test_username">>)),
-    ?assertEqual(allow, emqx_access_control:authorize(
-                          ClientInfo2, publish,   <<"test/test_username">>)),
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo3, subscribe, <<"test">>)), % nomatch
-    ?assertEqual(deny,  emqx_access_control:authorize(
-                          ClientInfo3, publish,   <<"test">>)), % nomatch
+t_topic_rules(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ok = emqx_authz_test_lib:test_no_topic_rules(ClientInfo, fun setup_client_samples/2),
+
+    ok = emqx_authz_test_lib:test_allow_topic_rules(ClientInfo, fun setup_client_samples/2),
+
+    ok = emqx_authz_test_lib:test_deny_topic_rules(ClientInfo, fun setup_client_samples/2).
+
+
+t_lookups(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   cn => <<"cn">>,
+                   dn => <<"dn">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    %% by clientid
+
+    ok = init_table(),
+    ok = insert(<<"INSERT INTO acl(clientid, topic, permission, action)"
+                  "VALUES($1, $2, $3, $4)">>,
+                [<<"clientid">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE clientid = ${clientid}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+    %% by peerhost
+
+    ok = init_table(),
+    ok = insert(<<"INSERT INTO acl(peerhost, topic, permission, action)"
+                  "VALUES($1, $2, $3, $4)">>,
+                [<<"127.0.0.1">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE peerhost = ${peerhost}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+    %% by cn
+
+    ok = init_table(),
+    ok = insert(<<"INSERT INTO acl(cn, topic, permission, action)"
+                  "VALUES($1, $2, $3, $4)">>,
+                [<<"cn">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE cn = ${cert_common_name}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+    %% by dn
+
+    ok = init_table(),
+    ok = insert(<<"INSERT INTO acl(dn, topic, permission, action)"
+                  "VALUES($1, $2, $3, $4)">>,
+                [<<"dn">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE dn = ${cert_subject}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]).
+
+t_pgsql_error(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ok = setup_config(
+      #{<<"query">> => <<"SOME INVALID STATEMENT">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{deny, subscribe, <<"a">>}]).
+
+
+t_create_invalid(_Config) ->
+    BadConfig = maps:merge(
+                  raw_pgsql_authz_config(),
+                  #{<<"server">> => <<"255.255.255.255:33333">>}),
+    {error, _} = emqx_authz:update(?CMD_REPLACE, [BadConfig]),
+
+    [] = emqx_authz:lookup().
+
+t_nonbinary_values(_Config) ->
+    ClientInfo = #{clientid => clientid,
+                   username => "username",
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+
+    ok = init_table(),
+    ok = insert(<<"INSERT INTO acl(clientid, username, topic, permission, action)"
+                  "VALUES($1, $2, $3, $4, $5)">>,
+                [<<"clientid">>, <<"username">>, <<"a">>, <<"allow">>, <<"subscribe">>]),
+
+    ok = setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE clientid = ${clientid} AND username = ${username}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]).
+
+%%------------------------------------------------------------------------------
+%% Helpers
+%%------------------------------------------------------------------------------
+
+raw_pgsql_authz_config() ->
+    #{
+        <<"enable">> => <<"true">>,
+
+        <<"type">> => <<"postgresql">>,
+        <<"database">> => <<"mqtt">>,
+        <<"username">> => <<"root">>,
+        <<"password">> => <<"public">>,
+
+        <<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE username = ${username}">>,
+
+        <<"server">> => pgsql_server()
+    }.
+
+q(Sql) ->
+    emqx_resource:query(
+      ?PGSQL_RESOURCE,
+      {sql, Sql}).
+
+insert(Sql, Params) ->
+    {ok, _} = emqx_resource:query(
+                ?PGSQL_RESOURCE,
+                {sql, Sql, Params}),
+    ok.
+
+init_table() ->
+    ok = drop_table(),
+    {ok, _, _} = q("CREATE TABLE acl(
+                       username VARCHAR(255),
+                       clientid VARCHAR(255),
+                       peerhost VARCHAR(255),
+                       cn VARCHAR(255),
+                       dn VARCHAR(255),
+                       topic VARCHAR(255),
+                       permission VARCHAR(255),
+                       action VARCHAR(255))"),
+    ok.
+
+drop_table() ->
+    {ok, _, _} = q("DROP TABLE IF EXISTS acl"),
     ok.
+
+setup_client_samples(ClientInfo, Samples) ->
+    #{username := Username} = ClientInfo,
+    ok = init_table(),
+    ok = lists:foreach(
+           fun(#{topics := Topics, permission := Permission, action := Action}) ->
+                   lists:foreach(
+                     fun(Topic) ->
+                             insert(<<"INSERT INTO acl(username, topic, permission, action)"
+                                      "VALUES($1, $2, $3, $4)">>,
+                                    [Username, Topic, Permission, Action])
+                     end,
+                     Topics)
+           end,
+           Samples),
+    setup_config(
+      #{<<"query">> => <<"SELECT permission, action, topic "
+                         "FROM acl WHERE username = ${username}">>}).
+
+setup_config(SpecialParams) ->
+    emqx_authz_test_lib:setup_config(
+      raw_pgsql_authz_config(),
+      SpecialParams).
+
+pgsql_server() ->
+    iolist_to_binary(
+      io_lib:format(
+        "~s:~b",
+        [?PGSQL_HOST, ?PGSQL_PORT])).
+
+pgsql_config() ->
+    #{auto_reconnect => true,
+      database => <<"mqtt">>,
+      username => <<"root">>,
+      password => <<"public">>,
+      pool_size => 8,
+      server => {?PGSQL_HOST, ?PGSQL_PORT},
+      ssl => #{enable => false}
+     }.
+
+start_apps(Apps) ->
+    lists:foreach(fun application:ensure_all_started/1, Apps).
+
+stop_apps(Apps) ->
+    lists:foreach(fun application:stop/1, Apps).

+ 200 - 70
apps/emqx_authz/test/emqx_authz_redis_SUITE.erl

@@ -4,7 +4,8 @@
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
 %% You may obtain a copy of the License at
-%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
 %%
 %% Unless required by applicable law or agreed to in writing, software
 %% distributed under the License is distributed on an "AS IS" BASIS,
@@ -21,8 +22,11 @@
 -include("emqx_authz.hrl").
 -include_lib("eunit/include/eunit.hrl").
 -include_lib("common_test/include/ct.hrl").
--include_lib("emqx/include/emqx_placeholder.hrl").
--define(CONF_DEFAULT, <<"authorization: {sources: []}">>).
+
+
+-define(REDIS_HOST, "redis").
+-define(REDIS_PORT, 6379).
+-define(REDIS_RESOURCE, <<"emqx_authz_redis_SUITE">>).
 
 all() ->
     emqx_common_test_helpers:all(?MODULE).
@@ -31,86 +35,212 @@ groups() ->
     [].
 
 init_per_suite(Config) ->
-    meck:new(emqx_resource, [non_strict, passthrough, no_history, no_link]),
-    meck:expect(emqx_resource, create, fun(_, _, _) -> {ok, meck_data} end ),
-    meck:expect(emqx_resource, remove, fun(_) -> ok end ),
-
-    ok = emqx_common_test_helpers:start_apps(
-           [emqx_conf, emqx_authz],
-           fun set_special_configs/1),
-
-    Rules = [#{<<"type">> => <<"redis">>,
-               <<"server">> => <<"127.0.0.1:27017">>,
-               <<"pool_size">> => 1,
-               <<"database">> => 0,
-               <<"password">> => <<"ee">>,
-               <<"auto_reconnect">> => true,
-               <<"ssl">> => #{<<"enable">> => false},
-               <<"cmd">> => <<"HGETALL mqtt_authz:", ?PH_USERNAME/binary>>
-              }],
-    {ok, _} = emqx_authz:update(replace, Rules),
-    Config.
+    case emqx_authn_test_lib:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of
+        true ->
+            ok = emqx_common_test_helpers:start_apps(
+                   [emqx_conf, emqx_authz],
+                   fun set_special_configs/1
+                  ),
+            ok = start_apps([emqx_resource, emqx_connector]),
+            {ok, _} = emqx_resource:create_local(
+              ?REDIS_RESOURCE,
+              emqx_connector_redis,
+              redis_config()),
+            Config;
+        false ->
+            {skip, no_redis}
+    end.
 
 end_per_suite(_Config) ->
-    {ok, _} = emqx:update_config(
-                [authorization],
-                #{<<"no_match">> => <<"allow">>,
-                  <<"cache">> => #{<<"enable">> => <<"true">>},
-                  <<"sources">> => []}),
-    emqx_common_test_helpers:stop_apps([emqx_authz, emqx_resource]),
-    meck:unload(emqx_resource),
-    ok.
+    ok = emqx_authz_test_lib:restore_authorizers(),
+    ok = emqx_resource:remove_local(?REDIS_RESOURCE),
+    ok = stop_apps([emqx_resource, emqx_connector]),
+    ok = emqx_common_test_helpers:stop_apps([emqx_authz]).
+
+init_per_testcase(Config) ->
+    ok = emqx_authz_test_lib:reset_authorizers(),
+    Config.
 
 set_special_configs(emqx_authz) ->
-    {ok, _} = emqx:update_config([authorization, cache, enable], false),
-    {ok, _} = emqx:update_config([authorization, no_match], deny),
-    {ok, _} = emqx:update_config([authorization, sources], []),
-    ok;
-set_special_configs(_App) ->
+    ok = emqx_authz_test_lib:reset_authorizers();
+
+set_special_configs(_) ->
     ok.
 
--define(SOURCE1, [<<"test/", ?PH_USERNAME/binary>>, <<"publish">>]).
--define(SOURCE2, [<<"test/", ?PH_CLIENTID/binary>>, <<"publish">>]).
--define(SOURCE3, [<<"#">>, <<"subscribe">>]).
 
 %%------------------------------------------------------------------------------
-%% Testcases
+%% Tests
 %%------------------------------------------------------------------------------
 
-t_authz(_) ->
+t_topic_rules(_Config) ->
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    ok = emqx_authz_test_lib:test_no_topic_rules(ClientInfo, fun setup_client_samples/2),
+
+    ok = emqx_authz_test_lib:test_allow_topic_rules(ClientInfo, fun setup_client_samples/2).
+
+
+t_lookups(_Config) ->
     ClientInfo = #{clientid => <<"clientid">>,
+                   cn => <<"cn">>,
+                   dn => <<"dn">>,
                    username => <<"username">>,
                    peerhost => {127,0,0,1},
                    zone => default,
                    listener => {tcp, default}
-                   },
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, []} end),
-    % nomatch
-    ?assertEqual(deny,
-                 emqx_access_control:authorize(ClientInfo, subscribe, <<"#">>)),
-    ?assertEqual(deny,
-                 emqx_access_control:authorize(ClientInfo, publish, <<"#">>)),
-
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?SOURCE1 ++ ?SOURCE2} end),
-    % nomatch
-    ?assertEqual(deny,
-        emqx_access_control:authorize(ClientInfo, subscribe, <<"+">>)),
-    % nomatch
-    ?assertEqual(deny,
-        emqx_access_control:authorize(ClientInfo, subscribe, <<"test/username">>)),
-
-    ?assertEqual(allow,
-        emqx_access_control:authorize(ClientInfo, publish, <<"test/clientid">>)),
-    ?assertEqual(allow,
-        emqx_access_control:authorize(ClientInfo, publish, <<"test/clientid">>)),
-
-    meck:expect(emqx_resource, query, fun(_, _) -> {ok, ?SOURCE3} end),
-
-    ?assertEqual(allow,
-        emqx_access_control:authorize(ClientInfo, subscribe, <<"#">>)),
-    % nomatch
-    ?assertEqual(deny,
-        emqx_access_control:authorize(ClientInfo, publish, <<"#">>)),
+                  },
+
+    ByClientid = #{<<"mqtt_user:clientid">> =>
+                   #{<<"a">> => <<"all">>}},
+
+    ok = setup_sample(ByClientid),
+    ok = setup_config(#{<<"cmd">> => <<"HGETALL mqtt_user:${clientid}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+    ByPeerhost = #{<<"mqtt_user:127.0.0.1">> =>
+                   #{<<"a">> => <<"all">>}},
+
+    ok = setup_sample(ByPeerhost),
+    ok = setup_config(#{<<"cmd">> => <<"HGETALL mqtt_user:${peerhost}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+    ByCN = #{<<"mqtt_user:cn">> =>
+             #{<<"a">> => <<"all">>}},
+
+    ok = setup_sample(ByCN),
+    ok = setup_config(#{<<"cmd">> => <<"HGETALL mqtt_user:${cert_common_name}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]),
+
+
+    ByDN = #{<<"mqtt_user:dn">> =>
+             #{<<"a">> => <<"all">>}},
+
+    ok = setup_sample(ByDN),
+    ok = setup_config(#{<<"cmd">> => <<"HGETALL mqtt_user:${cert_subject}">>}),
+
+    ok = emqx_authz_test_lib:test_samples(
+           ClientInfo,
+           [{allow, subscribe, <<"a">>},
+            {deny, subscribe, <<"b">>}]).
+
+t_create_invalid(_Config) ->
+    AuthzConfig = raw_redis_authz_config(),
+
+    InvalidConfigs =
+        [maps:without([<<"server">>], AuthzConfig),
+         AuthzConfig#{<<"server">> => <<"unknownhost:3333">>},
+         AuthzConfig#{<<"password">> => <<"wrongpass">>},
+         AuthzConfig#{<<"database">> => <<"5678">>}],
+
+    lists:foreach(
+      fun(Config) ->
+            {error, _} = emqx_authz:update(?CMD_REPLACE, [Config]),
+            [] = emqx_authz:lookup()
+
+      end,
+      InvalidConfigs).
+
+t_redis_error(_Config) ->
+    ok = setup_config(#{<<"cmd">> => <<"INVALID COMMAND">>}),
+
+    ClientInfo = #{clientid => <<"clientid">>,
+                   username => <<"username">>,
+                   peerhost => {127,0,0,1},
+                   zone => default,
+                   listener => {tcp, default}
+                  },
+
+    deny = emqx_access_control:authorize(ClientInfo, subscribe, <<"a">>).
+
+%%------------------------------------------------------------------------------
+%% Helpers
+%%------------------------------------------------------------------------------
+
+setup_sample(AuthzData) ->
+    {ok, _} = q(["FLUSHDB"]),
+    ok = lists:foreach(
+           fun({Key, Values}) ->
+                   lists:foreach(
+                     fun({TopicFilter, Action}) ->
+                             q(["HSET", Key, TopicFilter, Action])
+                     end,
+                     maps:to_list(Values))
+           end,
+           maps:to_list(AuthzData)).
+
+setup_client_samples(ClientInfo, Samples) ->
+    #{username := Username} = ClientInfo,
+    Key = <<"mqtt_user:", Username/binary>>,
+    lists:foreach(
+      fun(Sample) ->
+              #{topics := Topics,
+                permission := <<"allow">>,
+                action := Action} = Sample,
+              lists:foreach(
+                fun(Topic) ->
+                        q(["HSET", Key, Topic, Action])
+                end,
+                Topics)
+      end,
+      Samples),
+    setup_config(#{}).
+
+setup_config(SpecialParams) ->
+    Config = maps:merge(raw_redis_authz_config(), SpecialParams),
+    {ok, _} = emqx_authz:update(?CMD_REPLACE, [Config]),
     ok.
+
+raw_redis_authz_config() ->
+    #{
+        <<"enable">> => <<"true">>,
+
+        <<"type">> => <<"redis">>,
+        <<"cmd">> => <<"HGETALL mqtt_user:${username}">>,
+        <<"database">> => <<"1">>,
+        <<"password">> => <<"public">>,
+        <<"server">> => redis_server()
+    }.
+
+redis_server() ->
+    iolist_to_binary(
+      io_lib:format(
+        "~s:~b",
+        [?REDIS_HOST, ?REDIS_PORT])).
+
+q(Command) ->
+    emqx_resource:query(
+      ?REDIS_RESOURCE,
+      {cmd, Command}).
+
+redis_config() ->
+    #{auto_reconnect => true,
+      database => 1,
+      pool_size => 8,
+      redis_type => single,
+      password => "public",
+      server => {?REDIS_HOST, ?REDIS_PORT},
+      ssl => #{enable => false}
+     }.
+
+start_apps(Apps) ->
+    lists:foreach(fun application:ensure_all_started/1, Apps).
+
+stop_apps(Apps) ->
+    lists:foreach(fun application:stop/1, Apps).

+ 248 - 0
apps/emqx_authz/test/emqx_authz_test_lib.erl

@@ -0,0 +1,248 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authz_test_lib).
+
+-include("emqx_authz.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-define(DEFAULT_CHECK_AVAIL_TIMEOUT, 1000).
+
+reset_authorizers() ->
+    reset_authorizers(deny, false).
+
+restore_authorizers() ->
+    reset_authorizers(allow, true).
+
+reset_authorizers(Nomatch, ChacheEnabled) ->
+    {ok, _} = emqx:update_config(
+                [authorization],
+                #{<<"no_match">> => atom_to_binary(Nomatch),
+                  <<"cache">> => #{<<"enable">> => atom_to_binary(ChacheEnabled)},
+                  <<"sources">> => []}),
+    ok.
+
+setup_config(BaseConfig, SpecialParams) ->
+    Config = maps:merge(BaseConfig, SpecialParams),
+    {ok, _} = emqx_authz:update(?CMD_REPLACE, [Config]),
+    ok.
+
+is_tcp_server_available(Host, Port) ->
+    case gen_tcp:connect(Host, Port, [], ?DEFAULT_CHECK_AVAIL_TIMEOUT) of
+        {ok, Socket} ->
+            gen_tcp:close(Socket),
+            true;
+        {error, _} ->
+            false
+    end.
+
+test_samples(ClientInfo, Samples) ->
+    lists:foreach(
+      fun({Expected, Action, Topic}) ->
+              ct:pal(
+                "client_info: ~p, action: ~p, topic: ~p, expected: ~p",
+                [ClientInfo, Action, Topic, Expected]),
+              ?assertEqual(
+                 Expected,
+                 emqx_access_control:authorize(
+                   ClientInfo,
+                   Action,
+                   Topic))
+      end,
+      Samples).
+
+test_no_topic_rules(ClientInfo, SetupSamples) ->
+    %% No rules
+
+    ok = SetupSamples(ClientInfo, []),
+
+    ok = test_samples(
+           ClientInfo,
+           [{deny, subscribe, <<"#">>},
+            {deny, subscribe, <<"subs">>},
+            {deny, publish, <<"pub">>}]).
+
+test_allow_topic_rules(ClientInfo, SetupSamples) ->
+    Samples = [#{
+                 topics => [<<"eq testpub1/${username}">>,
+                            <<"testpub2/${clientid}">>,
+                            <<"testpub3/#">>],
+                 permission => <<"allow">>,
+                 action => <<"publish">>
+                },
+               #{
+                 topics => [<<"eq testsub1/${username}">>,
+                            <<"testsub2/${clientid}">>,
+                            <<"testsub3/#">>],
+                 permission => <<"allow">>,
+                 action => <<"subscribe">>
+                },
+
+               #{
+                 topics => [<<"eq testall1/${username}">>,
+                            <<"testall2/${clientid}">>,
+                            <<"testall3/#">>],
+                 permission => <<"allow">>,
+                 action => <<"all">>
+                }
+              ],
+
+    ok = reset_authorizers(deny, false),
+    ok = SetupSamples(ClientInfo, Samples),
+
+    ok = test_samples(
+           ClientInfo,
+           [
+
+            %% Publish rules
+
+            {deny, publish, <<"testpub1/username">>},
+            {allow, publish, <<"testpub1/${username}">>},
+            {allow, publish, <<"testpub2/clientid">>},
+            {allow, publish, <<"testpub3/foobar">>},
+
+            {deny, publish, <<"testpub2/username">>},
+            {deny, publish, <<"testpub1/clientid">>},
+
+
+            {deny, subscribe, <<"testpub1/username">>},
+            {deny, subscribe, <<"testpub2/clientid">>},
+            {deny, subscribe, <<"testpub3/foobar">>},
+
+            %% Subscribe rules
+
+            {deny, subscribe, <<"testsub1/username">>},
+            {allow, subscribe, <<"testsub1/${username}">>},
+            {allow, subscribe, <<"testsub2/clientid">>},
+            {allow, subscribe, <<"testsub3/foobar">>},
+            {allow, subscribe, <<"testsub3/+/foobar">>},
+            {allow, subscribe, <<"testsub3/#">>},
+
+            {deny, subscribe, <<"testsub2/username">>},
+            {deny, subscribe, <<"testsub1/clientid">>},
+            {deny, subscribe, <<"testsub4/foobar">>},
+            {deny, publish, <<"testsub1/username">>},
+            {deny, publish, <<"testsub2/clientid">>},
+            {deny, publish, <<"testsub3/foobar">>},
+
+            %% All rules
+
+            {deny, subscribe, <<"testall1/username">>},
+            {allow, subscribe, <<"testall1/${username}">>},
+            {allow, subscribe, <<"testall2/clientid">>},
+            {allow, subscribe, <<"testall3/foobar">>},
+            {allow, subscribe, <<"testall3/+/foobar">>},
+            {allow, subscribe, <<"testall3/#">>},
+            {deny, publish, <<"testall1/username">>},
+            {allow, publish, <<"testall1/${username}">>},
+            {allow, publish, <<"testall2/clientid">>},
+            {allow, publish, <<"testall3/foobar">>},
+
+            {deny, subscribe, <<"testall2/username">>},
+            {deny, subscribe, <<"testall1/clientid">>},
+            {deny, subscribe, <<"testall4/foobar">>},
+            {deny, publish, <<"testall2/username">>},
+            {deny, publish, <<"testall1/clientid">>},
+            {deny, publish, <<"testall4/foobar">>}
+           ]).
+
+test_deny_topic_rules(ClientInfo, SetupSamples) ->
+    Samples = [
+               #{
+                 topics => [<<"eq testpub1/${username}">>,
+                            <<"testpub2/${clientid}">>,
+                            <<"testpub3/#">>],
+                 permission => <<"deny">>,
+                 action => <<"publish">>
+                },
+               #{
+                 topics => [<<"eq testsub1/${username}">>,
+                            <<"testsub2/${clientid}">>,
+                            <<"testsub3/#">>],
+                 permission => <<"deny">>,
+                 action => <<"subscribe">>
+                },
+
+               #{
+                 topics => [<<"eq testall1/${username}">>,
+                            <<"testall2/${clientid}">>,
+                            <<"testall3/#">>],
+                 permission => <<"deny">>,
+                 action => <<"all">>
+                }
+              ],
+
+    ok = reset_authorizers(allow, false),
+    ok = SetupSamples(ClientInfo, Samples),
+
+    ok = test_samples(
+           ClientInfo,
+           [
+
+            %% Publish rules
+
+            {allow, publish, <<"testpub1/username">>},
+            {deny, publish, <<"testpub1/${username}">>},
+            {deny, publish, <<"testpub2/clientid">>},
+            {deny, publish, <<"testpub3/foobar">>},
+
+            {allow, publish, <<"testpub2/username">>},
+            {allow, publish, <<"testpub1/clientid">>},
+
+
+            {allow, subscribe, <<"testpub1/username">>},
+            {allow, subscribe, <<"testpub2/clientid">>},
+            {allow, subscribe, <<"testpub3/foobar">>},
+
+            %% Subscribe rules
+
+            {allow, subscribe, <<"testsub1/username">>},
+            {deny, subscribe, <<"testsub1/${username}">>},
+            {deny, subscribe, <<"testsub2/clientid">>},
+            {deny, subscribe, <<"testsub3/foobar">>},
+            {deny, subscribe, <<"testsub3/+/foobar">>},
+            {deny, subscribe, <<"testsub3/#">>},
+
+            {allow, subscribe, <<"testsub2/username">>},
+            {allow, subscribe, <<"testsub1/clientid">>},
+            {allow, subscribe, <<"testsub4/foobar">>},
+            {allow, publish, <<"testsub1/username">>},
+            {allow, publish, <<"testsub2/clientid">>},
+            {allow, publish, <<"testsub3/foobar">>},
+
+            %% All rules
+
+            {allow, subscribe, <<"testall1/username">>},
+            {deny, subscribe, <<"testall1/${username}">>},
+            {deny, subscribe, <<"testall2/clientid">>},
+            {deny, subscribe, <<"testall3/foobar">>},
+            {deny, subscribe, <<"testall3/+/foobar">>},
+            {deny, subscribe, <<"testall3/#">>},
+            {allow, publish, <<"testall1/username">>},
+            {deny, publish, <<"testall1/${username}">>},
+            {deny, publish, <<"testall2/clientid">>},
+            {deny, publish, <<"testall3/foobar">>},
+
+            {allow, subscribe, <<"testall2/username">>},
+            {allow, subscribe, <<"testall1/clientid">>},
+            {allow, subscribe, <<"testall4/foobar">>},
+            {allow, publish, <<"testall2/username">>},
+            {allow, publish, <<"testall1/clientid">>},
+            {allow, publish, <<"testall4/foobar">>}
+           ]).