Jelajahi Sumber

Merge pull request #1748 from emqtt/emqx30-feng

Improve the emqx_listeners module
Feng Lee 7 tahun lalu
induk
melakukan
d167a5c99a
4 mengubah file dengan 91 tambahan dan 61 penghapusan
  1. 1 2
      etc/emqx.conf
  2. 10 12
      src/emqx_access_control.erl
  3. 5 9
      src/emqx_acl_internal.erl
  4. 75 38
      src/emqx_listeners.erl

+ 1 - 2
etc/emqx.conf

@@ -424,7 +424,7 @@ log.syslog.level = error
 ## Value: true | false
 allow_anonymous = true
 
-## TODO: Allow or deny if no ACL rules match.
+## Allow or deny if no ACL rules matched.
 ##
 ## Value: allow | deny
 acl_nomatch = allow
@@ -455,7 +455,6 @@ acl_cache_max_size = 32
 ## Default: 1 minute
 acl_cache_ttl = 1m
 
-
 ##--------------------------------------------------------------------
 ## MQTT Protocol
 ##--------------------------------------------------------------------

+ 10 - 12
src/emqx_access_control.erl

@@ -32,8 +32,6 @@
 -define(TAB, ?MODULE).
 -define(SERVER, ?MODULE).
 
--record(state, {}).
-
 %%------------------------------------------------------------------------------
 %% API
 %%------------------------------------------------------------------------------
@@ -104,6 +102,15 @@ check_acl(Credentials, PubSub, Topic, AclMods, true) ->
             AclResult
     end.
 
+do_check_acl(#{zone := Zone}, _PubSub, _Topic, []) ->
+    emqx_zone:get_env(Zone, acl_nomatch, deny);
+do_check_acl(Credentials, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
+    case Mod:check_acl({Credentials, PubSub, Topic}, State) of
+        allow  -> allow;
+        deny   -> deny;
+        ignore -> do_check_acl(Credentials, PubSub, Topic, AclMods)
+    end.
+
 -spec(reload_acl() -> list(ok | {error, term()})).
 reload_acl() ->
     [Mod:reload_acl(State) || {Mod, State, _Seq} <- lookup_mods(acl)].
@@ -143,7 +150,7 @@ stop() ->
 
 init([]) ->
     _ = emqx_tables:new(?TAB, [set, protected, {read_concurrency, true}]),
-    {ok, #state{}}.
+    {ok, #{}}.
 
 handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
     Mods = lookup_mods(Type),
@@ -194,15 +201,6 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-do_check_acl(#client{zone = Zone}, _PubSub, _Topic, []) ->
-    emqx_zone:get_env(Zone, acl_nomatch, deny);
-do_check_acl(Client, PubSub, Topic, [{Mod, State, _Seq}|AclMods]) ->
-    case Mod:check_acl({Client, PubSub, Topic}, State) of
-        allow  -> allow;
-        deny   -> deny;
-        ignore -> do_check_acl(Client, PubSub, Topic, AclMods)
-    end.
-
 %%--------------------------------------------------------------------
 %% Internal functions
 %%--------------------------------------------------------------------

+ 5 - 9
src/emqx_acl_internal.erl

@@ -70,11 +70,8 @@ filter(_PubSub, {_AllowDeny, _Who, _, _Topics}) ->
     false.
 
 %% @doc Check ACL
--spec(check_acl({credentials(), pubsub(), topic()}, #{})
-      -> allow | deny | ignore).
-check_acl(_Who, #{acl_file := undefined}) ->
-    allow;
-check_acl({Credentials, PubSub, Topic}, #{}) ->
+-spec(check_acl({credentials(), pubsub(), topic()}, #{}) -> allow | deny | ignore).
+check_acl({Credentials, PubSub, Topic}, _State) ->
     case match(Credentials, Topic, lookup(PubSub)) of
         {matched, allow} -> allow;
         {matched, deny}  -> deny;
@@ -98,12 +95,11 @@ match(Credentials, Topic, [Rule|Rules]) ->
     end.
 
 -spec(reload_acl(#{}) -> ok | {error, term()}).
-reload_acl(#{acl_file := undefined}) ->
-    ok;
 reload_acl(#{acl_file := AclFile}) ->
     case catch load_rules_from_file(AclFile) of
-        true -> emqx_logger:error("Reload acl_file ~s successfully", [AclFile]),
-                ok;
+        true ->
+            emqx_logger:info("Reload acl_file ~s successfully", [AclFile]),
+            ok;
         {'EXIT', Error} ->
             {error, Error}
     end.

+ 75 - 38
src/emqx_listeners.erl

@@ -18,49 +18,64 @@
 -include("emqx_mqtt.hrl").
 
 -export([start/0, restart/0, stop/0]).
--export([start_listener/1, stop_listener/1, restart_listener/1]).
+-export([start_listener/1, start_listener/3]).
+-export([restart_listener/1, restart_listener/3]).
+-export([stop_listener/1, stop_listener/3]).
 
--type(listener() :: {atom(), esockd:listen_on(), [esockd:option()]}).
+-type(listener() :: {esockd:proto(), esockd:listen_on(), [esockd:option()]}).
 
-%% @doc Start all listeners
+%% @doc Start all listeners.
 -spec(start() -> ok).
 start() ->
     lists:foreach(fun start_listener/1, emqx_config:get_env(listeners, [])).
 
-%% Start MQTT/TCP listener
 -spec(start_listener(listener()) -> {ok, pid()} | {error, term()}).
-start_listener({tcp, ListenOn, Options}) ->
+start_listener({Proto, ListenOn, Options}) ->
+    case start_listener(Proto, ListenOn, Options) of
+        {ok, _} ->
+            io:format("Start mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]);
+        {error, Reason} ->
+            io:format(standard_error, "Failed to start mqtt:~s listener on ~s - ~p!",
+                      [Proto, format(ListenOn), Reason])
+    end.
+
+%% Start MQTT/TCP listener
+-spec(start_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
+      -> {ok, pid()} | {error, term()}).
+start_listener(tcp, ListenOn, Options) ->
     start_mqtt_listener('mqtt:tcp', ListenOn, Options);
 
 %% Start MQTT/TLS listener
-start_listener({Proto, ListenOn, Options}) when Proto == ssl; Proto == tls ->
+start_listener(Proto, ListenOn, Options) when Proto == ssl; Proto == tls ->
     start_mqtt_listener('mqtt:ssl', ListenOn, Options);
 
 %% Start MQTT/WS listener
-start_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws ->
+start_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
     Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws_connection, Options}]}]),
-    NumAcceptors = proplists:get_value(acceptors, Options, 4),
-    MaxConnections = proplists:get_value(max_connections, Options, 1024),
-    TcpOptions = proplists:get_value(tcp_options, Options, []),
-    RanchOpts = [{num_acceptors, NumAcceptors},
-                 {max_connections, MaxConnections} | TcpOptions],
-    cowboy:start_clear('mqtt:ws', with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}});
+    start_http_listener(fun cowboy:start_clear/3, 'mqtt:ws', ListenOn, ranch_opts(Options), Dispatch);
 
 %% Start MQTT/WSS listener
-start_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss ->
-    Dispatch = cowboy_router:compile([{'_', [{"/mqtt", emqx_ws, []}]}]),
-    NumAcceptors = proplists:get_value(acceptors, Options, 4),
-    MaxConnections = proplists:get_value(max_clients, Options, 1024),
-    TcpOptions = proplists:get_value(tcp_options, Options, []),
-    SslOptions = proplists:get_value(ssl_options, Options, []),
-    RanchOpts = [{num_acceptors, NumAcceptors},
-                 {max_connections, MaxConnections} | TcpOptions ++ SslOptions],
-    cowboy:start_tls('mqtt:wss', with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}).
+start_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
+    Dispatch = cowboy_router:compile([{'_', [{"/mqtt",  emqx_ws_connection, Options}]}]),
+    start_http_listener(fun cowboy:start_tls/3, 'mqtt:wss', ListenOn, ranch_opts(Options), Dispatch).
 
 start_mqtt_listener(Name, ListenOn, Options) ->
     SockOpts = esockd:parse_opt(Options),
-    MFA = {emqx_connection, start_link, [Options -- SockOpts]},
-    {ok, _} = esockd:open(Name, ListenOn, merge_default(SockOpts), MFA).
+    esockd:open(Name, ListenOn, merge_default(SockOpts),
+                {emqx_connection, start_link, [Options -- SockOpts]}).
+
+start_http_listener(Start, Name, ListenOn, RanchOpts, Dispatch) ->
+    Start(Name, with_port(ListenOn, RanchOpts), #{env => #{dispatch => Dispatch}}).
+
+ranch_opts(Options) ->
+    NumAcceptors = proplists:get_value(acceptors, Options, 4),
+    MaxConnections = proplists:get_value(max_connections, Options, 1024),
+    TcpOptions = proplists:get_value(tcp_options, Options, []),
+    RanchOpts = [{num_acceptors, NumAcceptors}, {max_connections, MaxConnections} | TcpOptions],
+    case proplists:get_value(ssl_options, Options) of
+        undefined  -> RanchOpts;
+        SslOptions -> RanchOpts ++ SslOptions
+    end.
 
 with_port(Port, Opts) when is_integer(Port) ->
     [{port, Port}|Opts];
@@ -73,34 +88,49 @@ restart() ->
     lists:foreach(fun restart_listener/1, emqx_config:get_env(listeners, [])).
 
 -spec(restart_listener(listener()) -> any()).
-restart_listener({tcp, ListenOn, _Options}) ->
+restart_listener({Proto, ListenOn, Options}) ->
+    restart_listener(Proto, ListenOn, Options).
+
+-spec(restart_listener(esockd:proto(), esockd:listen_on(), [esockd:option()]) -> any()).
+restart_listener(tcp, ListenOn, _Options) ->
     esockd:reopen('mqtt:tcp', ListenOn);
-restart_listener({Proto, ListenOn, _Options}) when Proto == ssl; Proto == tls ->
+restart_listener(Proto, ListenOn, _Options) when Proto == ssl; Proto == tls ->
     esockd:reopen('mqtt:ssl', ListenOn);
-restart_listener({Proto, ListenOn, Options}) when Proto == http; Proto == ws ->
+restart_listener(Proto, ListenOn, Options) when Proto == http; Proto == ws ->
     cowboy:stop_listener('mqtt:ws'),
-    start_listener({Proto, ListenOn, Options});
-restart_listener({Proto, ListenOn, Options}) when Proto == https; Proto == wss ->
+    start_listener(Proto, ListenOn, Options);
+restart_listener(Proto, ListenOn, Options) when Proto == https; Proto == wss ->
     cowboy:stop_listener('mqtt:wss'),
-    start_listener({Proto, ListenOn, Options});
-restart_listener({Proto, ListenOn, _Opts}) ->
+    start_listener(Proto, ListenOn, Options);
+restart_listener(Proto, ListenOn, _Opts) ->
     esockd:reopen(Proto, ListenOn).
 
-%% @doc Stop all listeners
+%% @doc Stop all listeners.
 -spec(stop() -> ok).
 stop() ->
     lists:foreach(fun stop_listener/1, emqx_config:get_env(listeners, [])).
 
--spec(stop_listener(listener()) -> ok | {error, any()}).
-stop_listener({tcp, ListenOn, _Opts}) ->
+-spec(stop_listener(listener()) -> ok | {error, term()}).
+stop_listener({Proto, ListenOn, Opts}) ->
+    case stop_listener(Proto, ListenOn, Opts) of
+        ok ->
+            io:format("Stop mqtt:~s listener on ~s successfully.~n", [Proto, format(ListenOn)]);
+        {error, Reason} ->
+            io:format(standard_error, "Failed to stop mqtt:~s listener on ~s - ~p.",
+                      [Proto, format(ListenOn), Reason])
+    end.
+
+-spec(stop_listener(esockd:proto(), esockd:listen_on(), [esockd:option()])
+      -> ok | {error, term()}).
+stop_listener(tcp, ListenOn, _Opts) ->
     esockd:close('mqtt:tcp', ListenOn);
-stop_listener({Proto, ListenOn, _Opts}) when Proto == ssl; Proto == tls ->
+stop_listener(Proto, ListenOn, _Opts) when Proto == ssl; Proto == tls ->
     esockd:close('mqtt:ssl', ListenOn);
-stop_listener({Proto, _ListenOn, _Opts}) when Proto == http; Proto == ws ->
+stop_listener(Proto, _ListenOn, _Opts) when Proto == http; Proto == ws ->
     cowboy:stop_listener('mqtt:ws');
-stop_listener({Proto, _ListenOn, _Opts}) when Proto == https; Proto == wss ->
+stop_listener(Proto, _ListenOn, _Opts) when Proto == https; Proto == wss ->
     cowboy:stop_listener('mqtt:wss');
-stop_listener({Proto, ListenOn, _Opts}) ->
+stop_listener(Proto, ListenOn, _Opts) ->
     esockd:close(Proto, ListenOn).
 
 merge_default(Options) ->
@@ -111,3 +141,10 @@ merge_default(Options) ->
             [{tcp_options, ?MQTT_SOCKOPTS} | Options]
     end.
 
+format(Port) when is_integer(Port) ->
+    io_lib:format("0.0.0.0:~w", [Port]);
+format({Addr, Port}) when is_list(Addr) ->
+    io_lib:format("~s:~w", [Addr, Port]);
+format({Addr, Port}) when is_tuple(Addr) ->
+    io_lib:format("~s:~w", [esockd_net:ntoab(Addr), Port]).
+