| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- %%--------------------------------------------------------------------
- %% Copyright (c) 2020-2021 EMQ Technologies Co., Ltd. All Rights Reserved.
- %%
- %% Licensed under the Apache License, Version 2.0 (the "License");
- %% you may not use this file except in compliance with the License.
- %% You may obtain a copy of the License at
- %%
- %% http://www.apache.org/licenses/LICENSE-2.0
- %%
- %% Unless required by applicable law or agreed to in writing, software
- %% distributed under the License is distributed on an "AS IS" BASIS,
- %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- %% See the License for the specific language governing permissions and
- %% limitations under the License.
- %%--------------------------------------------------------------------
- -module(emqx_acl_pgsql).
- -include("emqx_auth_pgsql.hrl").
- -include_lib("emqx/include/emqx.hrl").
- -include_lib("emqx/include/logger.hrl").
- %% ACL callbacks
- -export([ register_metrics/0
- , check_acl/5
- , description/0
- ]).
- -spec(register_metrics() -> ok).
- register_metrics() ->
- lists:foreach(fun emqx_metrics:ensure/1, ?ACL_METRICS).
- check_acl(ClientInfo, PubSub, Topic, NoMatchAction, #{pool := Pool} = State) ->
- case do_check_acl(Pool, ClientInfo, PubSub, Topic, NoMatchAction, State) of
- ok -> emqx_metrics:inc(?ACL_METRICS(ignore)), ok;
- {stop, allow} -> emqx_metrics:inc(?ACL_METRICS(allow)), {stop, allow};
- {stop, deny} -> emqx_metrics:inc(?ACL_METRICS(deny)), {stop, deny}
- end.
- do_check_acl(_Pool, #{username := <<$$, _/binary>>}, _PubSub, _Topic, _NoMatchAction, _State) ->
- ok;
- do_check_acl(Pool, ClientInfo, PubSub, Topic, _NoMatchAction, #{acl_query := {AclSql, AclParams}}) ->
- case emqx_auth_pgsql_cli:equery(Pool, AclSql, AclParams, ClientInfo) of
- {ok, _, []} -> ok;
- {ok, _, Rows} ->
- Rules = filter(PubSub, compile(Rows)),
- case match(ClientInfo, Topic, Rules) of
- {matched, allow} -> {stop, allow};
- {matched, deny} -> {stop, deny};
- nomatch -> ok
- end;
- {error, Reason} ->
- ?LOG(error, "[Postgres] do_check_acl error: ~p~n", [Reason]),
- ok
- end.
- match(_ClientInfo, _Topic, []) ->
- nomatch;
- match(ClientInfo, Topic, [Rule|Rules]) ->
- case emqx_access_rule:match(ClientInfo, Topic, Rule) of
- nomatch -> match(ClientInfo, Topic, Rules);
- {matched, AllowDeny} -> {matched, AllowDeny}
- end.
- filter(PubSub, Rules) ->
- [Term || Term = {_, _, Access, _} <- Rules,
- Access =:= PubSub orelse Access =:= pubsub].
- compile(Rows) ->
- compile(Rows, []).
- compile([], Acc) ->
- Acc;
- compile([{Allow, IpAddr, Username, ClientId, Access, Topic}|T], Acc) ->
- Who = who(IpAddr, Username, ClientId),
- Term = {allow(Allow), Who, access(Access), [topic(Topic)]},
- compile(T, [emqx_access_rule:compile(Term) | Acc]).
- who(_, <<"$all">>, _) ->
- all;
- who(null, null, null) ->
- throw(undefined_who);
- who(CIDR, Username, ClientId) ->
- Cols = [{ipaddr, b2l(CIDR)}, {user, Username}, {client, ClientId}],
- case [{C, V} || {C, V} <- Cols, not empty(V)] of
- [Who] -> Who;
- Conds -> {'and', Conds}
- end.
- allow(1) -> allow;
- allow(0) -> deny;
- allow(<<"1">>) -> allow;
- allow(<<"0">>) -> deny.
- access(1) -> subscribe;
- access(2) -> publish;
- access(3) -> pubsub;
- access(<<"1">>) -> subscribe;
- access(<<"2">>) -> publish;
- access(<<"3">>) -> pubsub.
- topic(<<"eq ", Topic/binary>>) ->
- {eq, Topic};
- topic(Topic) ->
- Topic.
- description() ->
- "ACL with Postgres".
- b2l(null) -> null;
- b2l(B) -> binary_to_list(B).
- empty(null) -> true;
- empty("") -> true;
- empty(<<>>) -> true;
- empty(_) -> false.
|