JimMoen 4 лет назад
Родитель
Сommit
1d3558ebe0
3 измененных файлов с 51 добавлено и 30 удалено
  1. 17 17
      apps/emqx/src/emqx_topic.erl
  2. 4 2
      apps/emqx/test/emqx_topic_SUITE.erl
  3. 30 11
      apps/emqx_authz/src/emqx_authz.erl

+ 17 - 17
apps/emqx/src/emqx_topic.erl

@@ -54,11 +54,11 @@ wildcard(Topic) when is_binary(Topic) ->
     wildcard(words(Topic));
     wildcard(words(Topic));
 wildcard([]) ->
 wildcard([]) ->
     false;
     false;
-wildcard(['#'|_]) ->
+wildcard(['#' | _]) ->
     true;
     true;
-wildcard(['+'|_]) ->
+wildcard(['+' | _]) ->
     true;
     true;
-wildcard([_H|T]) ->
+wildcard([_H | T]) ->
     wildcard(T).
     wildcard(T).
 
 
 %% @doc Match Topic name with filter.
 %% @doc Match Topic name with filter.
@@ -73,17 +73,17 @@ match(Name, Filter) when is_binary(Name), is_binary(Filter) ->
     match(words(Name), words(Filter));
     match(words(Name), words(Filter));
 match([], []) ->
 match([], []) ->
     true;
     true;
-match([H|T1], [H|T2]) ->
+match([H | T1], [H | T2]) ->
     match(T1, T2);
     match(T1, T2);
-match([_H|T1], ['+'|T2]) ->
+match([_H | T1], ['+' | T2]) ->
     match(T1, T2);
     match(T1, T2);
 match(_, ['#']) ->
 match(_, ['#']) ->
     true;
     true;
-match([_H1|_], [_H2|_]) ->
+match([_H1 | _], [_H2 | _]) ->
     false;
     false;
-match([_H1|_], []) ->
+match([_H1 | _], []) ->
     false;
     false;
-match([], [_H|_T2]) ->
+match([], [_H | _T2]) ->
     false.
     false.
 
 
 %% @doc Validate topic name or filter
 %% @doc Validate topic name or filter
@@ -110,13 +110,13 @@ validate2([]) ->
     true;
     true;
 validate2(['#']) -> % end with '#'
 validate2(['#']) -> % end with '#'
     true;
     true;
-validate2(['#'|Words]) when length(Words) > 0 ->
+validate2(['#' | Words]) when length(Words) > 0 ->
     error('topic_invalid_#');
     error('topic_invalid_#');
-validate2([''|Words]) ->
+validate2(['' | Words]) ->
     validate2(Words);
     validate2(Words);
-validate2(['+'|Words]) ->
+validate2(['+' | Words]) ->
     validate2(Words);
     validate2(Words);
-validate2([W|Words]) ->
+validate2([W | Words]) ->
     validate3(W) andalso validate2(Words).
     validate3(W) andalso validate2(Words).
 
 
 validate3(<<>>) ->
 validate3(<<>>) ->
@@ -164,7 +164,7 @@ word(<<"#">>) -> '#';
 word(Bin)     -> Bin.
 word(Bin)     -> Bin.
 
 
 %% @doc '$SYS' Topic.
 %% @doc '$SYS' Topic.
--spec(systop(atom()|string()|binary()) -> topic()).
+-spec(systop(atom() | string() | binary()) -> topic()).
 systop(Name) when is_atom(Name); is_list(Name) ->
 systop(Name) when is_atom(Name); is_list(Name) ->
     iolist_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name]));
     iolist_to_binary(lists:concat(["$SYS/brokers/", node(), "/", Name]));
 systop(Name) when is_binary(Name) ->
 systop(Name) when is_binary(Name) ->
@@ -175,10 +175,10 @@ feed_var(Var, Val, Topic) ->
     feed_var(Var, Val, words(Topic), []).
     feed_var(Var, Val, words(Topic), []).
 feed_var(_Var, _Val, [], Acc) ->
 feed_var(_Var, _Val, [], Acc) ->
     join(lists:reverse(Acc));
     join(lists:reverse(Acc));
-feed_var(Var, Val, [Var|Words], Acc) ->
-    feed_var(Var, Val, Words, [Val|Acc]);
-feed_var(Var, Val, [W|Words], Acc) ->
-    feed_var(Var, Val, Words, [W|Acc]).
+feed_var(Var, Val, [Var | Words], Acc) ->
+    feed_var(Var, Val, Words, [Val | Acc]);
+feed_var(Var, Val, [W | Words], Acc) ->
+    feed_var(Var, Val, Words, [W | Acc]).
 
 
 -spec(join(list(binary())) -> binary()).
 -spec(join(list(binary())) -> binary()).
 join([]) ->
 join([]) ->

+ 4 - 2
apps/emqx/test/emqx_topic_SUITE.erl

@@ -184,9 +184,11 @@ t_feed_var(_) ->
     ?assertEqual(<<"$queue/client/clientId">>,
     ?assertEqual(<<"$queue/client/clientId">>,
                  feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)),
                  feed_var(<<"$c">>, <<"clientId">>, <<"$queue/client/$c">>)),
     ?assertEqual(<<"username/test/client/x">>,
     ?assertEqual(<<"username/test/client/x">>,
-                 feed_var(?PH_USERNAME, <<"test">>, <<"username/", ?PH_USERNAME/binary, "/client/x">>)),
+                 feed_var( ?PH_USERNAME, <<"test">>
+                         , <<"username/", ?PH_USERNAME/binary, "/client/x">>)),
     ?assertEqual(<<"username/test/client/clientId">>,
     ?assertEqual(<<"username/test/client/clientId">>,
-                 feed_var(?PH_CLIENTID, <<"clientId">>, <<"username/test/client/", ?PH_CLIENTID/binary>>)).
+                 feed_var( ?PH_CLIENTID, <<"clientId">>
+                         , <<"username/test/client/", ?PH_CLIENTID/binary>>)).
 
 
 long_topic() ->
 long_topic() ->
     iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).
     iolist_to_binary([[integer_to_list(I), "/"] || I <- lists:seq(0, 66666)]).

+ 30 - 11
apps/emqx_authz/src/emqx_authz.erl

@@ -66,11 +66,14 @@ move(Type, Cmd) ->
     move(Type, Cmd, #{}).
     move(Type, Cmd, #{}).
 
 
 move(Type, #{<<"before">> := Before}, Opts) ->
 move(Type, #{<<"before">> := Before}, Opts) ->
-    emqx:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), ?CMD_MOVE_BEFORE(type(Before))}, Opts);
+    emqx:update_config( ?CONF_KEY_PATH
+                      , {?CMD_MOVE, type(Type), ?CMD_MOVE_BEFORE(type(Before))}, Opts);
 move(Type, #{<<"after">> := After}, Opts) ->
 move(Type, #{<<"after">> := After}, Opts) ->
-    emqx:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), ?CMD_MOVE_AFTER(type(After))}, Opts);
+    emqx:update_config( ?CONF_KEY_PATH
+                      , {?CMD_MOVE, type(Type), ?CMD_MOVE_AFTER(type(After))}, Opts);
 move(Type, Position, Opts) ->
 move(Type, Position, Opts) ->
-    emqx:update_config(?CONF_KEY_PATH, {?CMD_MOVE, type(Type), Position}, Opts).
+    emqx:update_config( ?CONF_KEY_PATH
+                      , {?CMD_MOVE, type(Type), Position}, Opts).
 
 
 update(Cmd, Sources) ->
 update(Cmd, Sources) ->
     update(Cmd, Sources, #{}).
     update(Cmd, Sources, #{}).
@@ -157,7 +160,8 @@ do_post_update({{?CMD_REPLACE, Type}, Source}, _NewSources) when is_map(Source)
     {OldSource, Front, Rear} = take(Type, OldInitedSources),
     {OldSource, Front, Rear} = take(Type, OldInitedSources),
     ok = ensure_resource_deleted(OldSource),
     ok = ensure_resource_deleted(OldSource),
     InitedSources = init_sources(check_sources([Source])),
     InitedSources = init_sources(check_sources([Source])),
-    ok = emqx_hooks:put('client.authorize', {?MODULE, authorize, [Front ++ InitedSources ++ Rear]}, -1),
+    ok = emqx_hooks:put( 'client.authorize'
+                       , {?MODULE, authorize, [Front ++ InitedSources ++ Rear]}, -1),
     ok = emqx_authz_cache:drain_cache();
     ok = emqx_authz_cache:drain_cache();
 do_post_update({{?CMD_DELETE, Type}, _Source}, _NewSources) ->
 do_post_update({{?CMD_DELETE, Type}, _Source}, _NewSources) ->
     OldInitedSources = lookup(),
     OldInitedSources = lookup(),
@@ -269,7 +273,7 @@ init_source(#{type := DB,
         {error, Reason} -> error({load_config_error, Reason});
         {error, Reason} -> error({load_config_error, Reason});
         Id -> Source#{annotations =>
         Id -> Source#{annotations =>
                       #{id => Id,
                       #{id => Id,
-                        query => Mod:parse_query(SQL)
+                        query => erlang:apply(Mod, parse_query, [SQL])
                        }
                        }
                    }
                    }
     end.
     end.
@@ -279,22 +283,36 @@ init_source(#{type := DB,
 %%--------------------------------------------------------------------
 %%--------------------------------------------------------------------
 
 
 %% @doc Check AuthZ
 %% @doc Check AuthZ
--spec(authorize(emqx_types:clientinfo(), emqx_types:all(), emqx_types:topic(), allow | deny, sources())
+-spec(authorize( emqx_types:clientinfo()
+               , emqx_types:all()
+               , emqx_types:topic()
+               , allow | deny
+               , sources())
       -> {stop, allow} | {ok, deny}).
       -> {stop, allow} | {ok, deny}).
 authorize(#{username := Username,
 authorize(#{username := Username,
             peerhost := IpAddress
             peerhost := IpAddress
            } = Client, PubSub, Topic, DefaultResult, Sources) ->
            } = Client, PubSub, Topic, DefaultResult, Sources) ->
     case do_authorize(Client, PubSub, Topic, Sources) of
     case do_authorize(Client, PubSub, Topic, Sources) of
         {matched, allow} ->
         {matched, allow} ->
-            ?SLOG(info, #{msg => "authorization_permission_allowed", username => Username, ipaddr => IpAddress, topic => Topic}),
+            ?SLOG(info, #{msg => "authorization_permission_allowed",
+                          username => Username,
+                          ipaddr => IpAddress,
+                          topic => Topic}),
             emqx_metrics:inc(?AUTHZ_METRICS(allow)),
             emqx_metrics:inc(?AUTHZ_METRICS(allow)),
             {stop, allow};
             {stop, allow};
         {matched, deny} ->
         {matched, deny} ->
-            ?SLOG(info, #{msg => "authorization_permission_denied", username => Username, ipaddr => IpAddress, topic => Topic}),
+            ?SLOG(info, #{msg => "authorization_permission_denied",
+                          username => Username,
+                          ipaddr => IpAddress,
+                          topic => Topic}),
             emqx_metrics:inc(?AUTHZ_METRICS(deny)),
             emqx_metrics:inc(?AUTHZ_METRICS(deny)),
             {stop, deny};
             {stop, deny};
         nomatch ->
         nomatch ->
-            ?SLOG(info, #{msg => "authorization_failed_nomatch", username => Username, ipaddr => IpAddress, topic => Topic, reason => "no-match rule"}),
+            ?SLOG(info, #{msg => "authorization_failed_nomatch",
+                          username => Username,
+                          ipaddr => IpAddress,
+                          topic => Topic,
+                          reason => "no-match rule"}),
             {stop, DefaultResult}
             {stop, DefaultResult}
     end.
     end.
 
 
@@ -311,7 +329,7 @@ do_authorize(Client, PubSub, Topic, [#{type := file} = F | Tail]) ->
 do_authorize(Client, PubSub, Topic,
 do_authorize(Client, PubSub, Topic,
                [Connector = #{type := Type} | Tail] ) ->
                [Connector = #{type := Type} | Tail] ) ->
     Mod = authz_module(Type),
     Mod = authz_module(Type),
-    case Mod:authorize(Client, PubSub, Topic, Connector) of
+    case erlang:apply(Mod, authorize, [Client, PubSub, Topic, Connector]) of
         nomatch -> do_authorize(Client, PubSub, Topic, Tail);
         nomatch -> do_authorize(Client, PubSub, Topic, Tail);
         Matched -> Matched
         Matched -> Matched
     end.
     end.
@@ -383,7 +401,8 @@ type(postgresql) -> postgresql;
 type(<<"postgresql">>) -> postgresql;
 type(<<"postgresql">>) -> postgresql;
 type('built-in-database') -> 'built-in-database';
 type('built-in-database') -> 'built-in-database';
 type(<<"built-in-database">>) -> 'built-in-database';
 type(<<"built-in-database">>) -> 'built-in-database';
-type(Unknown) -> error({unknown_authz_source_type, Unknown}). % should never happend if the input is type-checked by hocon schema
+%% should never happend if the input is type-checked by hocon schema
+type(Unknown) -> error({unknown_authz_source_type, Unknown}).
 
 
 %% @doc where the acl.conf file is stored.
 %% @doc where the acl.conf file is stored.
 acl_conf_file() ->
 acl_conf_file() ->