Kaynağa Gözat

feat(authz): support mongo single

Rory Z 4 yıl önce
ebeveyn
işleme
694f3bd67f

+ 2 - 0
.ci/docker-compose-file/docker-compose-mongo-tcp.yaml

@@ -9,6 +9,8 @@ services:
       MONGO_INITDB_DATABASE: mqtt
     networks:
       - emqx_bridge
+    ports:
+      - "27017:27017"
     command:
       --ipv6
       --bind_ip_all

+ 13 - 0
apps/emqx_authz/README.md

@@ -133,3 +133,16 @@ HSET mqtt_acl:emqx '$SYS/#' subscribe
 
 A rule of Redis ACL defines `publish`, `subscribe`, or `all `information. All lists in the rule are **allow** lists.
 
+#### Mongo
+
+Create Example BSON documents
+```sql
+db.inventory.insertOne(
+    {username: "emqx",
+     clientid: "emqx",
+     ipaddress: "127.0.0.1",
+     permission: "allow",
+     action: "all",
+     topics: ["#"]
+    })
+```

+ 12 - 0
apps/emqx_authz/etc/emqx_authz.conf

@@ -43,6 +43,18 @@ emqx_authz:{
        #     }
        #     cmd: "HGETALL mqtt_acl:%u"
        # },
+       # {
+       #     type: mongo
+       #     config: {
+       #        mongo_type: single
+       #        servers: "127.0.0.1:27017"
+       #        pool_size: 1
+       #        database: mqtt
+       #        ssl: {enable: false}
+       #     }
+       #     collection: mqtt_acl
+       #     find: { "$or": [ { "username": "%u" }, { "clientid": "%c" } ] }
+       # },
        {
            permission: allow
            action: all

+ 3 - 2
apps/emqx_authz/src/emqx_authz.erl

@@ -93,8 +93,9 @@ compile(#{topics := Topics,
          };
 
 compile(#{principal := Principal,
-          type := redis
-         } = Rule) ->
+          type := DB
+         } = Rule) when DB =:= redis;
+                        DB =:= mongo ->
     NRule = create_resource(Rule),
     NRule#{principal => compile_principal(Principal)};
 

+ 106 - 0
apps/emqx_authz/src/emqx_authz_mongo.erl

@@ -0,0 +1,106 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_authz_mongo).
+
+-include("emqx_authz.hrl").
+-include_lib("emqx/include/emqx.hrl").
+-include_lib("emqx/include/logger.hrl").
+
+%% ACL Callbacks
+-export([ authorize/4
+        , description/0
+        ]).
+
+-ifdef(TEST).
+-compile(export_all).
+-compile(nowarn_export_all).
+-endif.
+
+description() ->
+    "AuthZ with Mongo".
+
+authorize(Client, PubSub, Topic,
+            #{resource_id := ResourceID,
+              collection := Collection,
+              find := Find
+             }) ->
+    case emqx_resource:query(ResourceID, {find, Collection, replvar(Find, Client), #{}}) of
+        {error, Reason} ->
+            ?LOG(error, "[AuthZ] Query mongo error: ~p", [Reason]),
+            nomatch;
+        [] -> nomatch;
+        Rows ->
+            do_authorize(Client, PubSub, Topic, Rows)
+    end.
+
+do_authorize(_Client, _PubSub, _Topic, []) ->
+    nomatch;
+do_authorize(Client, PubSub, Topic, [Rule | Tail]) ->
+    case match(Client, PubSub, Topic, Rule)
+    of
+        {matched, Permission} -> {matched, Permission};
+        nomatch -> do_authorize(Client, PubSub, Topic, Tail)
+    end.
+
+match(Client, PubSub, Topic,
+      #{<<"topics">> := Topics,
+        <<"permission">> := Permission,
+        <<"action">> := Action
+       }) ->
+    Rule = #{<<"principal">> => all,
+             <<"permission">> => Permission,
+             <<"topics">> => Topics,
+             <<"action">> => Action
+            },
+    #{simple_rule :=
+      #{permission := NPermission} = NRule
+     } = hocon_schema:check_plain(
+            emqx_authz_schema,
+            #{<<"simple_rule">> => Rule},
+            #{atom_key => true},
+            [simple_rule]),
+    case emqx_authz:match(Client, PubSub, Topic, emqx_authz:compile(NRule)) of
+        true -> {matched, NPermission};
+        false -> nomatch
+    end.
+
+replvar(Find, #{clientid := Clientid,
+                username := Username,
+                peerhost := IpAddress
+               }) ->
+    Fun = fun
+              _Fun(K, V, AccIn) when is_map(V) -> maps:put(K, maps:fold(_Fun, AccIn, V), AccIn);
+              _Fun(K, V, AccIn) when is_list(V) ->
+                  maps:put(K, [ begin
+                                    [{K1, V1}] = maps:to_list(M),
+                                    _Fun(K1, V1, AccIn)
+                                end || M <- V],
+                           AccIn);
+              _Fun(K, V, AccIn) when is_binary(V) ->
+                  V1 = re:replace(V,  "%c", bin(Clientid), [global, {return, binary}]),
+                  V2 = re:replace(V1, "%u", bin(Username), [global, {return, binary}]),
+                  V3 = re:replace(V2, "%a", inet_parse:ntoa(IpAddress), [global, {return, binary}]),
+                  maps:put(K, V3, AccIn);
+              _Fun(K, V, AccIn) -> maps:put(K, V, AccIn)
+          end,
+    maps:fold(Fun, #{}, Find).
+
+bin(A) when is_atom(A) -> atom_to_binary(A, utf8);
+bin(B) when is_binary(B) -> B;
+bin(L) when is_list(L) -> list_to_binary(L);
+bin(X) -> X.
+

+ 7 - 1
apps/emqx_authz/src/emqx_authz_schema.erl

@@ -16,6 +16,13 @@ structs() -> ["emqx_authz"].
 fields("emqx_authz") ->
     [ {rules, rules()}
     ];
+fields(mongo_connector) ->
+    [ {principal, principal()}
+    , {type, #{type => hoconsc:enum([mongo])}}
+    , {config, #{type => map()}}
+    , {collection, #{type => atom()}}
+    , {find, #{type => map()}}
+    ];
 fields(redis_connector) ->
     [ {principal, principal()}
     , {type, #{type => hoconsc:enum([redis])}}
@@ -27,7 +34,6 @@ fields(redis_connector) ->
       }
     , {cmd, query()}
     ];
-
 fields(sql_connector) ->
     [ {principal, principal() }
     , {type, #{type => hoconsc:enum([mysql, pgsql])}}

+ 20 - 14
apps/emqx_connector/src/emqx_connector_mongo.erl

@@ -38,7 +38,7 @@ structs() -> [""].
 fields("") ->
     mongodb_fields() ++
     mongodb_topology_fields() ++
-    mongodb_rs_set_name_fields() ++
+    % mongodb_rs_set_name_fields() ++
     emqx_connector_schema_lib:ssl_fields().
 
 on_jsonify(Config) ->
@@ -71,7 +71,7 @@ on_start(InstId, #{servers := Servers,
 
     PoolName = emqx_plugin_libs_pool:pool_name(InstId),
     _ = emqx_plugin_libs_pool:start_pool(PoolName, ?MODULE, Opts ++ SslOpts),
-    {ok, #{pool => PoolName,
+    {ok, #{poolname => PoolName,
            type => Type,
            test_conn => TestConn,
            test_opts => TestOpts}}.
@@ -82,23 +82,27 @@ on_stop(InstId, #{poolname := PoolName}) ->
 
 on_query(InstId, {Action, Collection, Selector, Docs}, AfterQuery, #{poolname := PoolName} = State) ->
     logger:debug("mongodb connector ~p received request: ~p, at state: ~p", [InstId, {Action, Collection, Selector, Docs}, State]),
-    case Result = ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of
+    case ecpool:pick_and_do(PoolName, {?MODULE, mongo_query, [Action, Collection, Selector, Docs]}, no_handover) of
         {error, Reason} ->
             logger:debug("mongodb connector ~p do sql query failed, request: ~p, reason: ~p", [InstId, {Action, Collection, Selector, Docs}, Reason]),
-            emqx_resource:query_failed(AfterQuery);
-        _ ->
-            emqx_resource:query_success(AfterQuery)
-    end,
-    Result.
+            emqx_resource:query_failed(AfterQuery),
+            {error, Reason};
+        {ok, Cursor} when is_pid(Cursor) ->
+            emqx_resource:query_success(AfterQuery),
+            mc_cursor:foldl(fun(O, Acc2) -> [O|Acc2] end, [], Cursor, 1000);
+        Result ->
+            emqx_resource:query_success(AfterQuery),
+            Result
+    end.
 
 -dialyzer({nowarn_function, [on_health_check/2]}).
-on_health_check(_InstId, #{test_opts := TestOpts}) ->
+on_health_check(_InstId, #{test_opts := TestOpts} = State) ->
     case mc_worker_api:connect(TestOpts) of
         {ok, TestConn} ->
             mc_worker_api:disconnect(TestConn),
-            {ok, true};
+            {ok, State};
         {error, _} ->
-            {ok, false}
+            {error, health_check_failed, State}
     end.
 
 %% ===================================================================
@@ -197,11 +201,12 @@ mongodb_topology_fields() ->
     , {min_heartbeat_frequency_ms, fun duration/1}
     ].
 
-mongodb_rs_set_name_fields() ->
-    [ {rs_set_name, fun emqx_connector_schema_lib:database/1}
-    ].
+% mongodb_rs_set_name_fields() ->
+%     [ {rs_set_name, fun emqx_connector_schema_lib:database/1}
+%     ].
 
 auth_source(type) -> binary();
+auth_source(nullable) -> true;
 auth_source(_) -> undefined.
 
 servers(type) -> binary();
@@ -213,4 +218,5 @@ mongo_type(default) -> single;
 mongo_type(_) -> undefined.
 
 duration(type) -> emqx_schema:duration_ms();
+duration(nullable) -> true;
 duration(_) -> undefined.

+ 1 - 1
apps/emqx_connector/src/emqx_connector_redis.erl

@@ -140,7 +140,7 @@ on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) ->
                 eredis_cluster_pool_worker:is_connected(Pid) =:= true
             end, Workers) of
         true -> {ok, State};
-        false -> {error, test_query_failed, State}
+        false -> {error, health_check_failed, State}
     end;
 on_health_check(_InstId, #{poolname := PoolName} = State) ->
     emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State).

+ 2 - 2
apps/emqx_connector/src/emqx_connector_schema_lib.erl

@@ -99,11 +99,11 @@ pool_size(validator) -> [?MIN(1), ?MAX(64)];
 pool_size(_) -> undefined.
 
 username(type) -> binary();
-username(default) -> "root";
+username(nullable) -> true;
 username(_) -> undefined.
 
 password(type) -> binary();
-password(default) -> "";
+password(nullable) -> true;
 password(_) -> undefined.
 
 auto_reconnect(type) -> boolean();

+ 1 - 1
apps/emqx_plugin_libs/src/emqx_plugin_libs_pool.erl

@@ -54,5 +54,5 @@ health_check(PoolName, CheckFunc, State) when is_function(CheckFunc) ->
     end || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
     case length(Status) > 0 andalso lists:all(fun(St) -> St =:= true end, Status) of
         true -> {ok, State};
-        false -> {error, test_query_failed, State}
+        false -> {error, health_check_failed, State}
     end.