Ver código fonte

Merge pull request #341 from emqtt/dev-feng

Fix issue #266, More metrics, Code Refactor
Feng Lee 10 anos atrás
pai
commit
4b43ff9397

+ 2 - 0
include/emqttd.hrl

@@ -47,6 +47,8 @@
 %%------------------------------------------------------------------------------
 -type pubsub() :: publish | subscribe.
 
+-define(IS_PUBSUB(PS), (PS =:= publish orelse PS =:= subscribe)).
+
 %%------------------------------------------------------------------------------
 %% MQTT Topic
 %%------------------------------------------------------------------------------

+ 1 - 0
src/emqttd.app.src

@@ -1,6 +1,7 @@
 {application, emqttd,
  [
   {description, "Erlang MQTT Broker"},
+  {id, "emqttd"},
   {vsn, "0.12.1"},
   {modules, []},
   {registered, []},

+ 4 - 5
src/emqttd.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -41,7 +42,7 @@
 	{nodelay,   true}
 ]).
 
--type listener() :: {atom(), inet:port_number(), [esockd:option()]}. 
+-type listener() :: {atom(), inet:port_number(), [esockd:option()]}.
 
 %%------------------------------------------------------------------------------
 %% @doc Start emqttd application.
@@ -109,14 +110,12 @@ close_listeners(Listeners) when is_list(Listeners) ->
 close_listener({Protocol, Port, _Options}) ->
     esockd:close({Protocol, Port}).
 
-
 load_all_mods() ->
-    Mods = application:get_env(emqttd, modules, []),
-    lists:foreach(fun({Name, Opts}) -> 
+    lists:foreach(fun({Name, Opts}) ->
         Mod = list_to_atom("emqttd_mod_" ++ atom_to_list(Name)),
         Mod:load(Opts),
         lager:info("load module ~s successfully", [Name])
-    end, Mods).
+    end, env(modules)).
 
 is_mod_enabled(Name) ->
     env(modules, Name) =/= undefined.

+ 3 - 3
src/emqttd_access_control.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_access_control).
 
 -author("Feng Lee <feng@emqtt.io>").
@@ -61,8 +62,7 @@
 %%------------------------------------------------------------------------------
 -spec start_link() -> {ok, pid()} | ignore | {error, any()}.
 start_link() ->
-    {ok, AcOpts} = application:get_env(emqttd, access),
-    start_link(AcOpts).
+    start_link(emqttd:env(access)).
 
 -spec start_link(AcOpts :: list()) -> {ok, pid()} | ignore | {error, any()}.
 start_link(AcOpts) ->
@@ -92,7 +92,7 @@ auth(Client, Password, [{Mod, State} | Mods]) ->
       Client :: mqtt_client(),
       PubSub :: pubsub(),
       Topic  :: binary().
-check_acl(Client, PubSub, Topic) when PubSub =:= publish orelse PubSub =:= subscribe ->
+check_acl(Client, PubSub, Topic) when ?IS_PUBSUB(PubSub) ->
     case lookup_mods(acl) of
         [] -> allow;
         AclMods -> check_acl(Client, PubSub, Topic, AclMods)

+ 1 - 0
src/emqttd_access_rule.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_access_rule).
 
 -author("Feng Lee <feng@emqtt.io>").

+ 1 - 0
src/emqttd_acl_internal.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_acl_internal).
 
 -author("Feng Lee <feng@emqtt.io>").

+ 39 - 21
src/emqttd_alarm.erl

@@ -27,25 +27,35 @@
 
 -module(emqttd_alarm).
 
+-author("Feng Lee <feng@emqtt.io>").
+
 -include("emqttd.hrl").
 
+-behaviour(gen_event).
+
+-define(ALARM_MGR, ?MODULE).
+
+%% API Function Exports
 -export([start_link/0, alarm_fun/0, get_alarms/0,
          set_alarm/1, clear_alarm/1,
          add_alarm_handler/1, add_alarm_handler/2,
          delete_alarm_handler/1]).
 
+%% gen_event callbacks
 -export([init/1, handle_event/2, handle_call/2, handle_info/2,
-         terminate/2]).
+         terminate/2, code_change/3]).
 
--define(SERVER, ?MODULE).
+%%%=============================================================================
+%%% API
+%%%=============================================================================
 
 start_link() ->
-    case gen_event:start_link({local, ?SERVER}) of
-	{ok, Pid} ->
-	    gen_event:add_handler(?SERVER, ?MODULE, []),
-	    {ok, Pid};
-	Error ->
-        Error
+    start_with(fun(Pid) -> gen_event:add_handler(Pid, ?MODULE, []) end).
+
+start_with(Fun) ->
+    case gen_event:start_link({local, ?ALARM_MGR}) of
+        {ok, Pid} -> Fun(Pid), {ok, Pid};
+        Error     -> Error
     end.
 
 alarm_fun() ->
@@ -60,34 +70,36 @@ alarm_fun(Bool) ->
 
 -spec set_alarm(mqtt_alarm()) -> ok.
 set_alarm(Alarm) when is_record(Alarm, mqtt_alarm) ->
-    gen_event:notify(?SERVER, {set_alarm, Alarm}).
+    gen_event:notify(?ALARM_MGR, {set_alarm, Alarm}).
 
 -spec clear_alarm(any()) -> ok.
 clear_alarm(AlarmId) when is_binary(AlarmId) ->
-    gen_event:notify(?SERVER, {clear_alarm, AlarmId}).
+    gen_event:notify(?ALARM_MGR, {clear_alarm, AlarmId}).
 
+-spec get_alarms() -> list(mqtt_alarm()).
 get_alarms() ->
-    gen_event:call(?SERVER, ?MODULE, get_alarms).
+    gen_event:call(?ALARM_MGR, ?MODULE, get_alarms).
 
 add_alarm_handler(Module) when is_atom(Module) ->
-    gen_event:add_handler(?SERVER, Module, []).
+    gen_event:add_handler(?ALARM_MGR, Module, []).
 
 add_alarm_handler(Module, Args) when is_atom(Module) ->
-    gen_event:add_handler(?SERVER, Module, Args).
+    gen_event:add_handler(?ALARM_MGR, Module, Args).
 
 delete_alarm_handler(Module) when is_atom(Module) ->
-    gen_event:delete_handler(?SERVER, Module, []).
+    gen_event:delete_handler(?ALARM_MGR, Module, []).
+
+%%%=============================================================================
+%%% Default Alarm handler
+%%%=============================================================================
 
-%%-----------------------------------------------------------------
-%% Default Alarm handler
-%%-----------------------------------------------------------------
 init(_) ->
     {ok, []}.
     
-handle_event({set_alarm, Alarm = #mqtt_alarm{id = AlarmId,
+handle_event({set_alarm, Alarm = #mqtt_alarm{id       = AlarmId,
                                              severity = Severity,
-                                             title = Title,
-                                             summary = Summary}}, Alarms)->
+                                             title    = Title,
+                                             summary  = Summary}}, Alarms)->
     Timestamp = os:timestamp(),
     Json = mochijson2:encode([{id, AlarmId},
                               {severity, Severity},
@@ -120,6 +132,13 @@ terminate(swap, Alarms) ->
 terminate(_, _) ->
     ok.
 
+code_change(_OldVsn, State, _Extra) ->
+    {ok, State}.
+
+%%%=============================================================================
+%%% Internal functions
+%%%=============================================================================
+
 alarm_msg(Type, AlarmId, Json) ->
     Msg = emqttd_message:make(alarm,
                               topic(Type, AlarmId),
@@ -132,4 +151,3 @@ topic(alert, AlarmId) ->
 topic(clear, AlarmId) ->
     emqttd_topic:systop(<<"alarms/", AlarmId/binary, "/clear">>).
 
-

+ 31 - 33
src/emqttd_app.erl

@@ -29,15 +29,13 @@
 
 -author("Feng Lee <feng@emqtt.io>").
 
+-include("emqttd_cli.hrl").
+
 -behaviour(application).
 
 %% Application callbacks
 -export([start/2, stop/1]).
 
--define(PRINT_MSG(Msg), io:format(Msg)).
-
--define(PRINT(Format, Args), io:format(Format, Args)).
-
 %%%=============================================================================
 %%% Application callbacks
 %%%=============================================================================
@@ -106,35 +104,35 @@ start_server(Sup, {Name, Server, Opts}) ->
     start_child(Sup, Server, Opts),
     ?PRINT_MSG("[done]~n").
 
-start_child(Sup, {supervisor, Name}) ->
-    supervisor:start_child(Sup, supervisor_spec(Name));
-start_child(Sup, Name) when is_atom(Name) ->
-    {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name)).
-
-start_child(Sup, {supervisor, Name}, Opts) ->
-    supervisor:start_child(Sup, supervisor_spec(Name, Opts));
-start_child(Sup, Name, Opts) when is_atom(Name) ->
-    {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Name, Opts)).
-
-%%TODO: refactor...
-supervisor_spec(Name) ->
-    {Name,
-        {Name, start_link, []},
-            permanent, infinity, supervisor, [Name]}.
-
-supervisor_spec(Name, Opts) ->
-    {Name,
-        {Name, start_link, [Opts]},
-            permanent, infinity, supervisor, [Name]}.
-
-worker_spec(Name) ->
-    {Name,
-        {Name, start_link, []},
-            permanent, 10000, worker, [Name]}.
-worker_spec(Name, Opts) -> 
-    {Name,
-        {Name, start_link, [Opts]},
-            permanent, 10000, worker, [Name]}.
+start_child(Sup, {supervisor, Module}) ->
+    supervisor:start_child(Sup, supervisor_spec(Module));
+
+start_child(Sup, Module) when is_atom(Module) ->
+    {ok, _ChiId} = supervisor:start_child(Sup, worker_spec(Module)).
+
+start_child(Sup, {supervisor, Module}, Opts) ->
+    supervisor:start_child(Sup, supervisor_spec(Module, Opts));
+
+start_child(Sup, Module, Opts) when is_atom(Module) ->
+    supervisor:start_child(Sup, worker_spec(Module, Opts)).
+
+supervisor_spec(Module) when is_atom(Module) ->
+    supervisor_spec(Module, start_link, []).
+
+supervisor_spec(Module, Opts) ->
+    supervisor_spec(Module, start_link, [Opts]).
+
+supervisor_spec(M, F, A) ->
+    {M, {M, F, A}, permanent, infinity, supervisor, [M]}.
+
+worker_spec(Module) when is_atom(Module) ->
+    worker_spec(Module, start_link, []).
+
+worker_spec(Module, Opts) when is_atom(Module) ->
+    worker_spec(Module, start_link, [Opts]).
+
+worker_spec(M, F, A) ->
+    {M, {M, F, A}, permanent, 10000, worker, [M]}.
 
 -spec stop(State :: term()) -> term().
 stop(_State) ->

+ 1 - 1
src/emqttd_auth_anonymous.erl

@@ -36,5 +36,5 @@ init(Opts) -> {ok, Opts}.
 
 check(_Client, _Password, _Opts) -> ok.
 
-description() -> "Anonymous authentication module".
+description() -> "Anonymous Authentication Module".
 

+ 10 - 5
src/emqttd_auth_clientid.erl

@@ -20,7 +20,7 @@
 %%% SOFTWARE.
 %%%-----------------------------------------------------------------------------
 %%% @doc
-%%% ClientId authentication module.
+%%% ClientId Authentication Module.
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
@@ -51,22 +51,25 @@
 %% @doc Add clientid
 %% @end
 %%------------------------------------------------------------------------------
+-spec add_clientid(binary()) -> {atomic, ok} | {aborted, any()}.
 add_clientid(ClientId) when is_binary(ClientId) ->
     R = #mqtt_auth_clientid{client_id = ClientId},
-    mnesia:transaction(fun() -> mnesia:write(R) end).
+    mnesia:transaction(fun mnesia:write/1, [R]).
 
 %%------------------------------------------------------------------------------
 %% @doc Add clientid with password
 %% @end
 %%------------------------------------------------------------------------------
+-spec add_clientid(binary(), binary()) -> {atomic, ok} | {aborted, any()}.
 add_clientid(ClientId, Password) ->
     R = #mqtt_auth_clientid{client_id = ClientId, password = Password},
-    mnesia:transaction(fun() -> mnesia:write(R) end).
+    mnesia:transaction(fun mnesia:write/1, [R]).
 
 %%------------------------------------------------------------------------------
 %% @doc Lookup clientid
 %% @end
 %%------------------------------------------------------------------------------
+-spec lookup_clientid(binary()) -> list().
 lookup_clientid(ClientId) ->
 	mnesia:dirty_read(?AUTH_CLIENTID_TAB, ClientId).
 
@@ -74,6 +77,7 @@ lookup_clientid(ClientId) ->
 %% @doc Lookup all clientids
 %% @end
 %%------------------------------------------------------------------------------
+-spec all_clientids() -> list(binary()).
 all_clientids() ->
 	mnesia:dirty_all_keys(?AUTH_CLIENTID_TAB).
 
@@ -81,8 +85,9 @@ all_clientids() ->
 %% @doc Remove clientid
 %% @end
 %%------------------------------------------------------------------------------
+-spec remove_clientid(binary()) -> {atomic, ok} | {aborted, any()}.
 remove_clientid(ClientId) ->
-    mnesia:transaction(fun() -> mnesia:delete({?AUTH_CLIENTID_TAB, ClientId}) end).
+    mnesia:transaction(fun mnesia:delete/1, [{?AUTH_CLIENTID_TAB, ClientId}]).
 
 %%%=============================================================================
 %%% emqttd_auth_mod callbacks
@@ -95,7 +100,7 @@ init(Opts) ->
 	mnesia:add_table_copy(?AUTH_CLIENTID_TAB, node(), ram_copies),
     case proplists:get_value(file, Opts) of
         undefined -> ok;
-        File -> load(File)
+        File      -> load(File)
     end,
 	{ok, Opts}.
 

+ 2 - 2
src/emqttd_auth_ldap.erl

@@ -20,7 +20,7 @@
 %%% SOFTWARE.
 %%%-----------------------------------------------------------------------------
 %%% @doc
-%%% LDAP Authentication Module.
+%%% LDAP Authentication Module
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
@@ -28,7 +28,7 @@
 
 -author("Feng Lee <feng@emqtt.io>").
 
--include_lib("emqttd/include/emqttd.hrl").
+-include("emqttd.hrl").
 
 -import(proplists, [get_value/2, get_value/3]).
 

+ 3 - 3
src/emqttd_auth_mod.erl

@@ -26,7 +26,7 @@
 %%%-----------------------------------------------------------------------------
 -module(emqttd_auth_mod).
 
--author('feng@emqtt.io').
+-author("Feng Lee <feng@emqtt.io>").
 
 -include("emqttd.hrl").
 
@@ -50,9 +50,9 @@
 -export([behaviour_info/1]).
 
 behaviour_info(callbacks) ->
-        [{init, 1}, {check, 3}, {description, 0}];
+    [{init, 1}, {check, 3}, {description, 0}];
 behaviour_info(_Other) ->
-        undefined.
+    undefined.
 
 -endif.
 

+ 28 - 8
src/emqttd_auth_username.erl

@@ -20,13 +20,13 @@
 %%% SOFTWARE.
 %%%-----------------------------------------------------------------------------
 %%% @doc
-%%% emqttd authentication with username and password.
+%%% Authentication with username and password.
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
 -module(emqttd_auth_username).
 
--author('feng@emqtt.io').
+-author("Feng Lee <feng@emqtt.io>").
 
 -include("emqttd.hrl").
 
@@ -65,16 +65,36 @@ cli(_) ->
 %%% API 
 %%%=============================================================================
 
+%%------------------------------------------------------------------------------
+%% @doc Add user
+%% @end
+%%------------------------------------------------------------------------------
+-spec add_user(binary(), binary()) -> {atomic, ok} | {aborted, any()}.
 add_user(Username, Password) ->
-    R = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)},
-    mnesia:transaction(fun() -> mnesia:write(R) end).
-
+    User = #?AUTH_USERNAME_TAB{username = Username, password = hash(Password)},
+    mnesia:transaction(fun mnesia:write/1, [User]).
+
+%%------------------------------------------------------------------------------
+%% @doc Lookup user by username
+%% @end
+%%------------------------------------------------------------------------------
+-spec lookup_user(binary()) -> list().
 lookup_user(Username) ->
     mnesia:dirty_read(?AUTH_USERNAME_TAB, Username).
 
+%%------------------------------------------------------------------------------
+%% @doc Remove user
+%% @end
+%%------------------------------------------------------------------------------
+-spec remove_user(binary()) -> {atomic, ok} | {aborted, any()}.
 remove_user(Username) ->
-    mnesia:transaction(fun() -> mnesia:delete({?AUTH_USERNAME_TAB, Username}) end).
+    mnesia:transaction(fun mnesia:delete/1, [{?AUTH_USERNAME_TAB, Username}]).
 
+%%------------------------------------------------------------------------------
+%% @doc All usernames
+%% @end
+%%------------------------------------------------------------------------------
+-spec all_users() -> list().
 all_users() ->
     mnesia:dirty_all_keys(?AUTH_USERNAME_TAB).
 
@@ -104,7 +124,8 @@ check(#mqtt_client{username = Username}, Password, _Opts) ->
             end
 	end.
 	
-description() -> "Username password authentication module".
+description() ->
+    "Username password authentication module".
 
 %%%=============================================================================
 %%% Internal functions
@@ -123,4 +144,3 @@ salt() ->
     Salt = random:uniform(16#ffffffff),
     <<Salt:32>>.
 
-

+ 6 - 6
src/emqttd_bridge.erl

@@ -144,12 +144,12 @@ handle_info({nodeup, Node}, State = #state{node = Node}) ->
 handle_info(ping_down_node, State = #state{node = Node, ping_down_interval = Interval}) ->
     Self = self(),
     spawn_link(fun() ->
-                     case net_kernel:connect_node(Node) of
-                         true -> %%TODO: this is not right... fixme later
-                             Self ! {nodeup, Node};
-                         false ->
-                             erlang:send_after(Interval, Self, ping_down_node)
-                     end
+                 case net_kernel:connect_node(Node) of
+                     true -> %%TODO: this is not right... fixme later
+                         Self ! {nodeup, Node};
+                     false ->
+                         erlang:send_after(Interval, Self, ping_down_node)
+                 end
                end),
     {noreply, State};
 

+ 4 - 6
src/emqttd_bridge_sup.erl

@@ -38,12 +38,6 @@
 
 -export([init/1]).
 
-%%%=============================================================================
-%%% CLI
-%%%=============================================================================
-
-
-
 %%%=============================================================================
 %%% API
 %%%=============================================================================
@@ -55,6 +49,10 @@
 start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
+%%------------------------------------------------------------------------------
+%% @doc List all bridges
+%% @end
+%%------------------------------------------------------------------------------
 -spec bridges() -> [{tuple(), pid()}].
 bridges() ->
     [{{Node, SubTopic}, Pid} || {{bridge, Node, SubTopic}, Pid, worker, _} 

+ 6 - 5
src/emqttd_broker.erl

@@ -84,6 +84,7 @@ start_link() ->
 %% @doc Get running nodes
 %% @end
 %%------------------------------------------------------------------------------
+-spec running_nodes() -> list(node()).
 running_nodes() ->
     mnesia:system_info(running_db_nodes).
 
@@ -109,7 +110,7 @@ notify(EventType, Event) ->
 %% @end
 %%------------------------------------------------------------------------------
 env(Name) ->
-    proplists:get_value(Name, application:get_env(emqttd, broker, [])).
+    proplists:get_value(Name, emqttd:env(broker)).
 
 %%------------------------------------------------------------------------------
 %% @doc Get broker version
@@ -152,7 +153,7 @@ datetime() ->
 %%------------------------------------------------------------------------------
 -spec hook(Hook :: atom(), Name :: any(), MFA :: mfa()) -> ok | {error, any()}.
 hook(Hook, Name, MFA) ->
-    gen_server:call(?MODULE, {hook, Hook, Name, MFA}).
+    gen_server:call(?SERVER, {hook, Hook, Name, MFA}).
 
 %%------------------------------------------------------------------------------
 %% @doc Unhook
@@ -160,7 +161,7 @@ hook(Hook, Name, MFA) ->
 %%------------------------------------------------------------------------------
 -spec unhook(Hook :: atom(), Name :: any()) -> ok | {error, any()}.
 unhook(Hook, Name) ->
-    gen_server:call(?MODULE, {unhook, Hook, Name}).
+    gen_server:call(?SERVER, {unhook, Hook, Name}).
 
 %%------------------------------------------------------------------------------
 %% @doc Foreach hooks
@@ -266,13 +267,13 @@ handle_cast(_Msg, State) ->
 handle_info(heartbeat, State) ->
     publish(uptime, list_to_binary(uptime(State))),
     publish(datetime, list_to_binary(datetime())),
-    {noreply, State, hibernate};
+    {noreply, State};
 
 handle_info(tick, State) ->
     retain(brokers),
     retain(version,  list_to_binary(version())),
     retain(sysdescr, list_to_binary(sysdescr())),
-    {noreply, State, hibernate};
+    {noreply, State};
 
 handle_info(_Info, State) ->
     {noreply, State}.

+ 5 - 3
src/emqttd_cli.erl

@@ -223,8 +223,10 @@ plugins(["list"]) ->
 
 plugins(["load", Name]) ->
     case emqttd_plugins:load(list_to_atom(Name)) of
-        {ok, StartedApps} -> ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
-        {error, Reason} -> ?PRINT("load plugin error: ~p~n", [Reason])
+        {ok, StartedApps} ->
+            ?PRINT("Start apps: ~p~nPlugin ~s loaded successfully.~n", [StartedApps, Name]);
+        {error, Reason}   ->
+            ?PRINT("load plugin error: ~p~n", [Reason])
     end;
 
 plugins(["unload", Name]) ->
@@ -236,7 +238,7 @@ plugins(["unload", Name]) ->
     end;
 
 plugins(_) ->
-    ?USAGE([{"plugins list",            "query loaded plugins"},
+    ?USAGE([{"plugins list",            "show loaded plugins"},
             {"plugins load <Plugin>",   "load plugin"},
             {"plugins unload <Plugin>", "unload plugin"}]).
 

+ 15 - 33
src/emqttd_client.erl

@@ -110,7 +110,7 @@ handle_call(kick, _From, State) ->
     {stop, {shutdown, kick}, ok, State};
 
 handle_call(Req, _From, State = #state{peername = Peername}) ->
-    lager:error("Client ~s: unexpected request - ~p", [emqttd_net:format(Peername), Req]),
+    lager:error("Client(~s): unexpected request - ~p", [emqttd_net:format(Peername), Req]),
     {reply, {error, unsupported_request}, State}.    
 
 handle_cast({subscribe, TopicTable}, State) ->
@@ -120,7 +120,7 @@ handle_cast({unsubscribe, Topics}, State) ->
     with_session(fun(SessPid) -> emqttd_session:unsubscribe(SessPid, Topics) end, State);
 
 handle_cast(Msg, State = #state{peername = Peername}) ->
-    lager:error("Client ~s: unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
+    lager:error("Client(~s): unexpected msg - ~p",[emqttd_net:format(Peername), Msg]),
     {noreply, State}.
 
 handle_info(timeout, State) ->
@@ -152,11 +152,12 @@ handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
     network_error(Reason, State);
 
 handle_info({inet_reply, _Sock, {error, Reason}}, State = #state{peername = Peername}) ->
-    lager:error("Client ~s: unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]),
+    lager:error("Client(~s): unexpected inet_reply '~p'", [emqttd_net:format(Peername), Reason]),
     {noreply, State};
 
 handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport, socket = Socket, peername = Peername}) ->
-    lager:debug("Client ~s: Start KeepAlive with ~p seconds", [emqttd_net:format(Peername), TimeoutSec]),
+    lager:debug("Client(~s): Start KeepAlive with ~p seconds",
+                  [emqttd_net:format(Peername), TimeoutSec]),
     StatFun = fun() ->
             case Transport:getstat(Socket, [recv_oct]) of
                 {ok, [{recv_oct, RecvOct}]} -> {ok, RecvOct};
@@ -169,13 +170,12 @@ handle_info({keepalive, start, TimeoutSec}, State = #state{transport = Transport
 handle_info({keepalive, check}, State = #state{peername = Peername, keepalive = KeepAlive}) ->
     case emqttd_keepalive:check(KeepAlive) of
     {ok, KeepAlive1} ->
-        lager:debug("Client ~s: Keepalive Resumed", [emqttd_net:format(Peername)]),
         noreply(State#state{keepalive = KeepAlive1});
     {error, timeout} ->
-        lager:debug("Client ~s: Keepalive Timeout!", [emqttd_net:format(Peername)]),
+        lager:debug("Client(~s): Keepalive Timeout!", [emqttd_net:format(Peername)]),
         stop({shutdown, keepalive_timeout}, State#state{keepalive = undefined});
     {error, Error} ->
-        lager:debug("Client ~s: Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]),
+        lager:debug("Client(~s): Keepalive Error: ~p!", [emqttd_net:format(Peername), Error]),
         stop({shutdown, keepalive_error}, State#state{keepalive = undefined})
     end;
 
@@ -183,10 +183,10 @@ handle_info(Info, State = #state{peername = Peername}) ->
     lager:error("Client ~s: unexpected info ~p",[emqttd_net:format(Peername), Info]),
     {noreply, State}.
 
-terminate(Reason, #state{peername = Peername,
-                         transport = Transport,
-                         socket = Socket,
-                         keepalive = KeepAlive,
+terminate(Reason, #state{peername    = Peername,
+                         transport   = Transport,
+                         socket      = Socket,
+                         keepalive   = KeepAlive,
                          proto_state = ProtoState}) ->
     lager:info("Client(~s) terminated, reason: ~p", [emqttd_net:format(Peername), Reason]),
     emqttd_keepalive:cancel(KeepAlive),
@@ -228,9 +228,9 @@ received(Bytes, State = #state{packet_opts = PacketOpts,
                                conn_name   = ConnStr}) ->
     case Parser(Bytes) of
     {more, NewParser} ->
-        {noreply, control_throttle(State #state{parser = NewParser}), hibernate};
+        noreply(control_throttle(State#state{parser = NewParser}));
     {ok, Packet, Rest} ->
-        received_stats(Packet),
+        emqttd_metrics:received(Packet),
         case emqttd_protocol:received(Packet, ProtoState) of
         {ok, ProtoState1} ->
             received(Rest, State#state{parser = emqttd_parser:new(PacketOpts),
@@ -244,12 +244,12 @@ received(Bytes, State = #state{packet_opts = PacketOpts,
             stop(Reason, State#state{proto_state = ProtoState1})
         end;
     {error, Error} ->
-        lager:error("MQTT detected framing error ~p for connection ~p", [Error, ConnStr]),
+        lager:error("MQTT framing error ~p for connection ~p", [Error, ConnStr]),
         stop({shutdown, Error}, State)
     end.
 
 network_error(Reason, State = #state{peername = Peername}) ->
-    lager:warning("Client ~s: MQTT detected network error '~p'",
+    lager:warning("Client(~s): MQTT detected network error '~p'",
                     [emqttd_net:format(Peername), Reason]),
     stop({shutdown, conn_closed}, State).
 
@@ -269,21 +269,3 @@ control_throttle(State = #state{conn_state = Flow,
         {_,            _} -> run_socket(State)
     end.
 
-received_stats(?PACKET(Type)) ->
-    emqttd_metrics:inc('packets/received'), inc(Type).
-inc(?CONNECT) ->
-    emqttd_metrics:inc('packets/connect');
-inc(?PUBLISH) ->
-    emqttd_metrics:inc('messages/received'),
-    emqttd_metrics:inc('packets/publish/received');
-inc(?SUBSCRIBE) ->
-    emqttd_metrics:inc('packets/subscribe');
-inc(?UNSUBSCRIBE) ->
-    emqttd_metrics:inc('packets/unsubscribe');
-inc(?PINGREQ) ->
-    emqttd_metrics:inc('packets/pingreq');
-inc(?DISCONNECT) ->
-    emqttd_metrics:inc('packets/disconnect');
-inc(_) ->
-    ignore.
-

+ 5 - 2
src/emqttd_ctl.erl

@@ -139,7 +139,10 @@ code_change(_OldVsn, State, _Extra) ->
 %%% Internal Function Definitions
 %%%=============================================================================
 
-noreply(State) -> {noreply, State, hibernate}.
+noreply(State) ->
+    {noreply, State, hibernate}.
+
+next_seq(State = #state{seq = Seq}) ->
+    State#state{seq = Seq + 1}.
 
-next_seq(State = #state{seq = Seq}) -> State#state{seq = Seq + 1}.
 

+ 5 - 3
src/emqttd_dist.erl

@@ -27,6 +27,8 @@
 
 -module(emqttd_dist).
 
+-import(lists, [concat/1]).
+
 -export([parse_node/1]).
 
 parse_node(Name) when is_list(Name) ->
@@ -40,10 +42,10 @@ parse_node(Name) when is_list(Name) ->
 with_domain(Name) ->
     case net_kernel:longnames() of
     true ->
-        Name ++ "@" ++ inet_db:gethostname() ++
-             "." ++ inet_db:res_option(domain);
+        concat([Name, "@", inet_db:gethostname(),
+                  ".", inet_db:res_option(domain)]);
     false ->
-        Name ++ "@" ++ inet_db:gethostname();
+        concat([Name, "@", inet_db:gethostname()]);
     _ ->
         Name
     end.

+ 2 - 2
src/emqttd_gen_mod.erl

@@ -41,9 +41,9 @@
 -export([behaviour_info/1]).
 
 behaviour_info(callbacks) ->
-        [{load, 1}, {unload, 1}];
+    [{load, 1}, {unload, 1}];
 behaviour_info(_Other) ->
-        undefined.
+    undefined.
 
 -endif.
 

+ 9 - 8
src/emqttd_http.erl

@@ -30,6 +30,7 @@
 -author("Feng Lee <feng@emqtt.io>").
 
 -include("emqttd.hrl").
+
 -include("emqttd_protocol.hrl").
 
 -import(proplists, [get_value/2, get_value/3]).
@@ -39,14 +40,14 @@
 handle_request(Req) ->
     handle_request(Req:get(method), Req:get(path), Req).
 
-handle_request('GET', "/mqtt/status", Req) ->
+handle_request('GET', "/status", Req) ->
     {InternalStatus, _ProvidedStatus} = init:get_status(),
     AppStatus =
     case lists:keysearch(emqttd, 1, application:which_applications()) of
         false         -> not_running;
         {value, _Ver} -> running
     end,
-    Status = io_lib:format("Node ~s is ~s~nemqttd is ~s~n",
+    Status = io_lib:format("Node ~s is ~s~nemqttd is ~s",
                             [node(), InternalStatus, AppStatus]),
     Req:ok({"text/plain", iolist_to_binary(Status)});
 
@@ -59,15 +60,15 @@ handle_request('POST', "/mqtt/publish", Req) ->
 	case authorized(Req) of
 	true ->
         ClientId = get_value("client", Params, http),
-        Qos = int(get_value("qos", Params, "0")),
-        Retain = bool(get_value("retain", Params,  "0")),
-        Topic = list_to_binary(get_value("topic", Params)),
-        Payload = list_to_binary(get_value("message", Params)),
+        Qos      = int(get_value("qos", Params, "0")),
+        Retain   = bool(get_value("retain", Params,  "0")),
+        Topic    = list_to_binary(get_value("topic", Params)),
+        Payload  = list_to_binary(get_value("message", Params)),
         case {validate(qos, Qos), validate(topic, Topic)} of
             {true, true} ->
                 Msg = emqttd_message:make(ClientId, Qos, Topic, Payload),
                 emqttd_pubsub:publish(Msg#mqtt_message{retain  = Retain}),
-                Req:ok({"text/plain", <<"ok\n">>});
+                Req:ok({"text/plain", <<"ok">>});
            {false, _} ->
                 Req:respond({400, [], <<"Bad QoS">>});
             {_, false} ->
@@ -83,7 +84,7 @@ handle_request('POST', "/mqtt/publish", Req) ->
 handle_request('GET', "/mqtt", Req) ->
     lager:info("Websocket Connection from: ~s", [Req:get(peer)]),
     Upgrade = Req:get_header_value("Upgrade"),
-    Proto = Req:get_header_value("Sec-WebSocket-Protocol"),
+    Proto   = Req:get_header_value("Sec-WebSocket-Protocol"),
     case {is_websocket(Upgrade), Proto} of
         {true, "mqtt" ++ _Vsn} ->
             emqttd_ws_client:start_link(Req);

+ 5 - 0
src/emqttd_keepalive.erl

@@ -34,10 +34,13 @@
                     tsec, tmsg, tref,
                     repeat = 0}).
 
+-type keepalive() :: #keepalive{}.
+
 %%------------------------------------------------------------------------------
 %% @doc Start a keepalive
 %% @end
 %%------------------------------------------------------------------------------
+-spec start(fun(), integer(), any()) -> undefined | keepalive().
 start(_, 0, _) ->
     undefined;
 start(StatFun, TimeoutSec, TimeoutMsg) ->
@@ -50,6 +53,7 @@ start(StatFun, TimeoutSec, TimeoutMsg) ->
 %% @doc Check keepalive, called when timeout.
 %% @end
 %%------------------------------------------------------------------------------
+-spec check(keepalive()) -> {ok, keepalive()} | {error, any()}.
 check(KeepAlive = #keepalive{statfun = StatFun, statval = LastVal, repeat = Repeat}) ->
     case StatFun() of
         {ok, NewVal} ->
@@ -71,6 +75,7 @@ resume(KeepAlive = #keepalive{tsec = TimeoutSec, tmsg = TimeoutMsg}) ->
 %% @doc Cancel Keepalive
 %% @end
 %%------------------------------------------------------------------------------
+-spec cancel(keepalive()) -> ok.
 cancel(#keepalive{tref = TRef}) ->
     cancel(TRef);
 cancel(undefined) -> 

+ 1 - 0
src/emqttd_log.erl

@@ -30,3 +30,4 @@
 
 -module(emqttd_log).
 
+%%TODO: Hooks to log???

+ 87 - 1
src/emqttd_metrics.erl

@@ -31,6 +31,8 @@
 
 -include("emqttd.hrl").
 
+-include("emqttd_protocol.hrl").
+
 -behaviour(gen_server).
 
 -define(SERVER, ?MODULE).
@@ -38,6 +40,9 @@
 %% API Function Exports
 -export([start_link/0]).
 
+%% Received/Sent Metrics
+-export([received/1, sent/1]).
+
 -export([all/0, value/1,
          inc/1, inc/2, inc/3,
          dec/2, dec/3,
@@ -65,6 +70,14 @@
     {counter, 'packets/connack'},          % CONNACK Packets sent
     {counter, 'packets/publish/received'}, % PUBLISH packets received
     {counter, 'packets/publish/sent'},     % PUBLISH packets sent
+    {counter, 'packets/puback/received'},  % PUBACK packets received
+    {counter, 'packets/puback/sent'},      % PUBACK packets sent
+    {counter, 'packets/pubrec/received'},  % PUBREC packets received
+    {counter, 'packets/pubrec/sent'},      % PUBREC packets sent
+    {counter, 'packets/pubrel/received'},  % PUBREL packets received
+    {counter, 'packets/pubrel/sent'},      % PUBREL packets sent
+    {counter, 'packets/pubcomp/received'}, % PUBCOMP packets received
+    {counter, 'packets/pubcomp/sent'},     % PUBCOMP packets sent
     {counter, 'packets/subscribe'},        % SUBSCRIBE Packets received 
     {counter, 'packets/suback'},           % SUBACK packets sent 
     {counter, 'packets/unsubscribe'},      % UNSUBSCRIBE Packets received
@@ -78,6 +91,12 @@
 -define(SYSTOP_MESSAGES, [
     {counter, 'messages/received'},      % Messages received
     {counter, 'messages/sent'},          % Messages sent
+    {counter, 'messages/qos0/received'}, % Messages received
+    {counter, 'messages/qos0/sent'},     % Messages sent
+    {counter, 'messages/qos1/received'}, % Messages received
+    {counter, 'messages/qos1/sent'},     % Messages sent
+    {counter, 'messages/qos2/received'}, % Messages received
+    {counter, 'messages/qos2/sent'},     % Messages sent
     {gauge,   'messages/retained'},      % Messagea retained
     {counter, 'messages/dropped'}        % Messages dropped
 ]).
@@ -94,6 +113,73 @@
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
+received(Packet = ?PACKET(Type)) ->
+    inc('packets/received'),
+    received(Type, Packet).
+received(?CONNECT, _Packet) ->
+    inc('packets/connect');
+received(?PUBLISH, ?PUBLISH(Qos, _PktId)) ->
+    inc('packets/publish/received'),
+    inc('messages/received'),
+    qos_received(Qos);
+received(?PUBACK, _Packet) ->  
+    inc('packets/puback/received');
+received(?PUBREC, _Packet) ->  
+    inc('packets/pubrec/received');
+received(?PUBREL, _Packet) ->
+    inc('packets/pubrel/received');
+received(?PUBCOMP, _Packet) ->
+    inc('packets/pubcomp/received');
+received(?SUBSCRIBE, _Packet) ->
+    inc('packets/subscribe');
+received(?UNSUBSCRIBE, _Packet) ->
+    inc('packets/unsubscribe');
+received(?PINGREQ, _Packet) ->
+    inc('packets/pingreq');
+received(?DISCONNECT, _Packet) ->
+    inc('packets/disconnect');
+received(_, _) -> ignore.
+
+qos_received(?QOS_0) ->
+    inc('messages/qos0/received');
+qos_received(?QOS_1) ->
+    inc('messages/qos1/received');
+qos_received(?QOS_2) ->
+    inc('messages/qos2/received').
+
+sent(Packet = ?PACKET(Type)) ->
+    emqttd_metrics:inc('packets/sent'),
+    sent(Type, Packet).
+sent(?CONNACK, _Packet) ->
+    inc('packets/connack');
+sent(?PUBLISH, ?PUBLISH(Qos, _PktId)) ->
+    inc('packets/publish/sent'),
+    inc('messages/sent'),
+    qos_sent(Qos);
+sent(?PUBACK, _Packet) ->
+    inc('packets/puback/sent');
+sent(?PUBREC, _Packet) ->
+    inc('packets/pubrec/sent');
+sent(?PUBREL, _Packet) ->
+    inc('packets/pubrel/sent');
+sent(?PUBCOMP, _Packet) ->
+    inc('packets/pubcomp/sent');
+sent(?SUBACK, _Packet) ->
+    inc('packets/suback');
+sent(?UNSUBACK, _Packet) ->
+    inc('packets/unsuback');
+sent(?PINGRESP, _Packet) ->
+    inc('packets/pingresp');
+sent(_Type, _Packet) ->
+    ingore.
+
+qos_sent(?QOS_0) ->
+    inc('messages/qos0/sent');
+qos_sent(?QOS_1) ->
+    inc('messages/qos1/sent');
+qos_sent(?QOS_2) ->
+    inc('messages/qos2/sent').
+
 %%------------------------------------------------------------------------------
 %% @doc Get all metrics
 %% @end
@@ -198,7 +284,7 @@ init([]) ->
     {ok, #state{tick_tref = emqttd_broker:start_tick(tick)}, hibernate}.
 
 handle_call(_Req, _From, State) ->
-    {reply, error,  State}.
+    {reply, error, State}.
 
 handle_cast(_Msg, State) ->
     {noreply, State}.

+ 1 - 1
src/emqttd_mnesia.erl

@@ -27,7 +27,7 @@
 
 -module(emqttd_mnesia).
 
--author('feng@emqtt.io').
+-author("Feng Lee <feng@emqtt.io>").
 
 -include("emqttd.hrl").
 

+ 4 - 2
src/emqttd_mod_presence.erl

@@ -38,8 +38,10 @@
 -export([client_connected/3, client_disconnected/3]).
 
 load(Opts) ->
-    emqttd_broker:hook('client.connected', {?MODULE, client_connected}, {?MODULE, client_connected, [Opts]}),
-    emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected}, {?MODULE, client_disconnected, [Opts]}),
+    emqttd_broker:hook('client.connected', {?MODULE, client_connected},
+                        {?MODULE, client_connected, [Opts]}),
+    emqttd_broker:hook('client.disconnected', {?MODULE, client_disconnected},
+                        {?MODULE, client_disconnected, [Opts]}),
     {ok, Opts}.
 
 client_connected(ConnAck, #mqtt_client{client_id  = ClientId,

+ 9 - 7
src/emqttd_mod_rewrite.erl

@@ -46,11 +46,11 @@ load(Opts) ->
     {ok, Terms} = file:consult(File),
     Sections = compile(Terms),
     emqttd_broker:hook('client.subscribe', {?MODULE, rewrite_subscribe}, 
-                       {?MODULE, rewrite, [subscribe, Sections]}),
+                        {?MODULE, rewrite, [subscribe, Sections]}),
     emqttd_broker:hook('client.unsubscribe', {?MODULE, rewrite_unsubscribe},
-                       {?MODULE, rewrite, [unsubscribe, Sections]}),
+                        {?MODULE, rewrite, [unsubscribe, Sections]}),
     emqttd_broker:hook('message.publish', {?MODULE, rewrite_publish},
-                       {?MODULE, rewrite, [publish, Sections]}).
+                        {?MODULE, rewrite, [publish, Sections]}).
 
 rewrite(_ClientId, TopicTable, subscribe, Sections) ->
     lager:info("rewrite subscribe: ~p", [TopicTable]),
@@ -83,9 +83,9 @@ reload(File) ->
     end.
             
 unload(_) ->
-    emqttd_broker:unhook('client.subscribe', {?MODULE, rewrite_subscribe}),
-    emqttd_broker:unhook('client.unsubscribe', {?MODULE, rewrite_unsubscribe}),
-    emqttd_broker:unhook('message.publish', {?MODULE, rewrite_publish}).
+    emqttd_broker:unhook('client.subscribe',  {?MODULE, rewrite_subscribe}),
+    emqttd_broker:unhook('client.unsubscribe',{?MODULE, rewrite_unsubscribe}),
+    emqttd_broker:unhook('message.publish',   {?MODULE, rewrite_publish}).
 
 %%%=============================================================================
 %%% Internal functions
@@ -116,7 +116,8 @@ match_rule(Topic, []) ->
 match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
     case re:run(Topic, MP, [{capture, all_but_first, list}]) of
         {match, Captured} ->
-            Vars = lists:zip(["\\$" ++ integer_to_list(I) || I <- lists:seq(1, length(Captured))], Captured),
+            Vars = lists:zip(["\\$" ++ integer_to_list(I)
+                                || I <- lists:seq(1, length(Captured))], Captured),
             iolist_to_binary(lists:foldl(
                     fun({Var, Val}, Acc) ->
                             re:replace(Acc, Var, Val, [global])
@@ -124,3 +125,4 @@ match_rule(Topic, [{rewrite, MP, Dest} | Rules]) ->
         nomatch ->
             match_rule(Topic, Rules)
     end.
+

+ 4 - 2
src/emqttd_net.erl

@@ -30,9 +30,11 @@
 
 -include_lib("kernel/include/inet.hrl").
 
--export([tcp_name/3, tcp_host/1, getopts/2, setopts/2, getaddr/2, port_to_listeners/1]).
+-export([tcp_name/3, tcp_host/1, getopts/2, setopts/2,
+         getaddr/2, port_to_listeners/1]).
 
--export([peername/1, sockname/1, format/2, format/1, connection_string/2, ntoa/1]).
+-export([peername/1, sockname/1, format/2, format/1,
+         connection_string/2, ntoa/1]).
 
 -define(FIRST_TEST_BIND_PORT, 10000).
 

+ 1 - 0
src/emqttd_opts.erl

@@ -24,6 +24,7 @@
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------
+
 -module(emqttd_opts).
 
 -author("Feng Lee <feng@emqtt.io>").

+ 2 - 4
src/emqttd_plugins.erl

@@ -41,7 +41,6 @@
 %% @doc Load all plugins when the broker started.
 %% @end
 %%------------------------------------------------------------------------------
-
 -spec load() -> list() | {error, any()}.
 load() ->
     case env(loaded_file) of
@@ -64,7 +63,7 @@ with_loaded_file(File, SuccFun) ->
 load_plugins(Names, Persistent) ->
     Plugins = list(), NotFound = Names -- names(Plugins),
     case NotFound of
-        [] -> ok;
+        []       -> ok;
         NotFound -> lager:error("Cannot find plugins: ~p", [NotFound])
     end,
     NeedToLoad = Names -- NotFound -- names(started_app),
@@ -78,7 +77,7 @@ load_plugins(Names, Persistent) ->
 unload() ->
     case env(loaded_file) of
         {ok, File} ->
-            with_loaded_file(File, fun(Names) -> stop_plugins(Names) end);
+            with_loaded_file(File, fun stop_plugins/1);
         undefined ->
             ignore
     end.
@@ -128,7 +127,6 @@ plugin(PluginsDir, AppFile0) ->
 %% @doc Load One Plugin
 %% @end
 %%------------------------------------------------------------------------------
-
 -spec load(atom()) -> ok | {error, any()}.
 load(PluginName) when is_atom(PluginName) ->
     case lists:member(PluginName, names(started_app)) of

+ 9 - 6
src/emqttd_pooler.erl

@@ -42,9 +42,12 @@
 %%%=============================================================================
 %%% API
 %%%=============================================================================
--spec start_link(I :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}.
-start_link(I) ->
-    gen_server:start_link(?MODULE, [I], []).
+-spec start_link(Id :: pos_integer()) -> {ok, pid()} | ignore | {error, any()}.
+start_link(Id) ->
+    gen_server:start_link({local, name(Id)}, ?MODULE, [Id], []).
+
+name(Id) ->
+    list_to_atom(lists:concat([?MODULE, "_", integer_to_list(Id)])).
 
 %%------------------------------------------------------------------------------
 %% @doc Submit work to pooler
@@ -64,9 +67,9 @@ async_submit(Fun) ->
 %%% gen_server callbacks
 %%%=============================================================================
 
-init([I]) ->
-    gproc_pool:connect_worker(pooler, {pooler, I}),
-    {ok, #state{id = I}}.
+init([Id]) ->
+    gproc_pool:connect_worker(pooler, {pooler, Id}),
+    {ok, #state{id = Id}}.
 
 handle_call({submit, Fun}, _From, State) ->
     {reply, run(Fun), State};

+ 65 - 70
src/emqttd_protocol.erl

@@ -38,7 +38,7 @@
 
 -export([received/2, send/2, redeliver/2, shutdown/2]).
 
--export([handle/2]).
+-export([process/2]).
 
 %% Protocol State
 -record(proto_state, {peername,
@@ -65,8 +65,8 @@
 %%------------------------------------------------------------------------------
 
 init(Peername, SendFun, Opts) ->
-    MaxLen = proplists:get_value(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
-    WsInitialHeaders = proplists:get_value(ws_initial_headers, Opts),
+    MaxLen = emqttd_opts:g(max_clientid_len, Opts, ?MAX_CLIENTID_LEN),
+    WsInitialHeaders = emqttd_opts:g(ws_initial_headers, Opts),
 	#proto_state{peername           = Peername,
                  sendfun            = SendFun,
                  max_clientid_len   = MaxLen,
@@ -130,7 +130,7 @@ session(#proto_state{session = Session}) ->
 %%A Client can only send the CONNECT Packet once over a Network Connection. 
 -spec received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}. 
 received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
-    handle(Packet, State#proto_state{connected = true});
+    process(Packet, State#proto_state{connected = true});
 
 received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
     {error, protocol_bad_connect, State};
@@ -143,19 +143,19 @@ received(Packet = ?PACKET(_Type), State) ->
     trace(recv, Packet, State),
 	case validate_packet(Packet) of
 	ok ->
-        handle(Packet, State);
+        process(Packet, State);
 	{error, Reason} ->
 		{error, Reason, State}
 	end.
 
-handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) ->
+process(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}) ->
 
     #mqtt_packet_connect{proto_ver  = ProtoVer,
                          proto_name = ProtoName,
                          username   = Username,
                          password   = Password,
                          clean_sess = CleanSess,
-                         keep_alive  = KeepAlive,
+                         keep_alive = KeepAlive,
                          client_id  = ClientId} = Var,
 
     State1 = State0#proto_state{proto_ver  = ProtoVer,
@@ -190,7 +190,7 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
                             exit({shutdown, Error})
                     end;
                 {error, Reason}->
-                    lager:error("~s@~s: username '~s', login failed - ~s",
+                    lager:error("~s@~s: username '~s' login failed for ~s",
                                     [ClientId, emqttd_net:format(Peername), Username, Reason]),
                     {?CONNACK_CREDENTIALS, State1}
                                                         
@@ -203,8 +203,8 @@ handle(Packet = ?CONNECT_PACKET(Var), State0 = #proto_state{peername = Peername}
     %% Send connack
     send(?CONNACK_PACKET(ReturnCode1), State3);
 
-handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
-           State = #proto_state{client_id = ClientId}) ->
+process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
+        State = #proto_state{client_id = ClientId}) ->
 
     case check_acl(publish, Topic, State) of
         allow ->
@@ -214,70 +214,76 @@ handle(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload),
     end,
 	{ok, State};
 
-handle(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
-    emqttd_session:puback(Session, PacketId),
-    {ok, State};
+process(?PUBACK_PACKET(?PUBACK, PacketId), State = #proto_state{session = Session}) ->
+    emqttd_session:puback(Session, PacketId), {ok, State};
 
-handle(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
+process(?PUBACK_PACKET(?PUBREC, PacketId), State = #proto_state{session = Session}) ->
     emqttd_session:pubrec(Session, PacketId),
     send(?PUBREL_PACKET(PacketId), State);
 
-handle(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
+process(?PUBACK_PACKET(?PUBREL, PacketId), State = #proto_state{session = Session}) ->
     emqttd_session:pubrel(Session, PacketId),
     send(?PUBACK_PACKET(?PUBCOMP, PacketId), State);
 
-handle(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session}) ->
-    emqttd_session:pubcomp(Session, PacketId),
-    {ok, State};
+process(?PUBACK_PACKET(?PUBCOMP, PacketId), State = #proto_state{session = Session})->
+    emqttd_session:pubcomp(Session, PacketId), {ok, State};
 
 %% protect from empty topic list
-handle(?SUBSCRIBE_PACKET(PacketId, []), State) ->
+process(?SUBSCRIBE_PACKET(PacketId, []), State) ->
     send(?SUBACK_PACKET(PacketId, []), State);
 
-handle(?SUBSCRIBE_PACKET(PacketId, TopicTable), State = #proto_state{client_id = ClientId, session = Session}) ->
+process(?SUBSCRIBE_PACKET(PacketId, TopicTable),
+        State = #proto_state{client_id = ClientId, session = Session}) ->
     AllowDenies = [check_acl(subscribe, Topic, State) || {Topic, _Qos} <- TopicTable],
     case lists:member(deny, AllowDenies) of
         true ->
-            %%TODO: return 128 QoS when deny... no need to SUBACK?
-            lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]);
+            lager:error("SUBSCRIBE from '~s' Denied: ~p", [ClientId, TopicTable]),
+            send(?SUBACK_PACKET(PacketId, [16#80 || _ <- TopicTable]), State);
         false ->
-            Callback = fun(GrantedQos) -> send(?SUBACK_PACKET(PacketId, GrantedQos), State) end,
-            emqttd_session:subscribe(Session, TopicTable, Callback)
-    end,
-    {ok, State};
+            AckFun = fun(GrantedQos) ->
+                        send(?SUBACK_PACKET(PacketId, GrantedQos), State)
+                     end,
+            emqttd_session:subscribe(Session, TopicTable, AckFun), {ok, State}
+    end;
 
 %% protect from empty topic list
-handle(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
+process(?UNSUBSCRIBE_PACKET(PacketId, []), State) ->
     send(?UNSUBACK_PACKET(PacketId), State);
 
-handle(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
+process(?UNSUBSCRIBE_PACKET(PacketId, Topics), State = #proto_state{session = Session}) ->
     emqttd_session:unsubscribe(Session, Topics),
     send(?UNSUBACK_PACKET(PacketId), State);
 
-handle(?PACKET(?PINGREQ), State) ->
+process(?PACKET(?PINGREQ), State) ->
     send(?PACKET(?PINGRESP), State);
 
-handle(?PACKET(?DISCONNECT), State) ->
+process(?PACKET(?DISCONNECT), State) ->
     % clean willmsg
     {stop, normal, State#proto_state{will_msg = undefined}}.
 
-publish(Packet = ?PUBLISH(?QOS_0, _PacketId), #proto_state{client_id = ClientId, session = Session}) ->
-    emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet));
+publish(Packet = ?PUBLISH(?QOS_0, _PacketId),
+        #proto_state{client_id = ClientId, session = Session}) ->
+    Msg = emqttd_message:from_packet(ClientId, Packet),
+    emqttd_session:publish(Session, Msg);
 
-publish(Packet = ?PUBLISH(?QOS_1, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
-    case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
+publish(Packet = ?PUBLISH(?QOS_1, PacketId),
+        State = #proto_state{client_id = ClientId, session = Session}) ->
+    Msg = emqttd_message:from_packet(ClientId, Packet),
+    case emqttd_session:publish(Session, Msg) of
         ok ->
             send(?PUBACK_PACKET(?PUBACK, PacketId), State);
         {error, Error} ->
-            lager:error("Client ~s: publish qos1 error - ~p", [ClientId, Error])
+            lager:error("Client(~s): publish qos1 error - ~p", [ClientId, Error])
     end;
 
-publish(Packet = ?PUBLISH(?QOS_2, PacketId), State = #proto_state{client_id = ClientId, session = Session}) ->
-    case emqttd_session:publish(Session, emqttd_message:from_packet(ClientId, Packet)) of
+publish(Packet = ?PUBLISH(?QOS_2, PacketId),
+        State = #proto_state{client_id = ClientId, session = Session}) ->
+    Msg = emqttd_message:from_packet(ClientId, Packet),
+    case emqttd_session:publish(Session, Msg) of
         ok ->
             send(?PUBACK_PACKET(?PUBREC, PacketId), State);
         {error, Error} ->
-            lager:error("Client ~s: publish qos2 error - ~p", [ClientId, Error])
+            lager:error("Client(~s): publish qos2 error - ~p", [ClientId, Error])
     end.
 
 -spec send(mqtt_message() | mqtt_packet(), proto_state()) -> {ok, proto_state()}.
@@ -285,9 +291,9 @@ send(Msg, State) when is_record(Msg, mqtt_message) ->
 	send(emqttd_message:to_packet(Msg), State);
 
 send(Packet, State = #proto_state{sendfun = SendFun, peername = Peername})
-        when is_record(Packet, mqtt_packet) ->
+    when is_record(Packet, mqtt_packet) ->
     trace(send, Packet, State),
-    sent_stats(Packet),
+    emqttd_metrics:sent(Packet),
     Data = emqttd_serialiser:serialise(Packet),
     lager:debug("SENT to ~s: ~p", [emqttd_net:format(Peername), Data]),
     emqttd_metrics:inc('bytes/sent', size(Data)),
@@ -370,28 +376,31 @@ validate_clientid(#mqtt_packet_connect{client_id = ClientId}, #proto_state{max_c
     true;
 
 %% MQTT3.1.1 allow null clientId.
-validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311, client_id = ClientId}, _ProtoState) 
+validate_clientid(#mqtt_packet_connect{proto_ver =?MQTT_PROTO_V311,
+                                       client_id = ClientId}, _ProtoState) 
     when size(ClientId) =:= 0 ->
     true;
 
-validate_clientid(#mqtt_packet_connect {proto_ver = Ver, clean_sess = CleanSess, client_id = ClientId}, _ProtoState) -> 
+validate_clientid(#mqtt_packet_connect{proto_ver  = Ver,
+                                       clean_sess = CleanSess,
+                                       client_id  = ClientId}, _ProtoState) -> 
     lager:warning("Invalid ClientId: ~s, ProtoVer: ~p, CleanSess: ~s", [ClientId, Ver, CleanSess]),
     false.
 
-validate_packet(#mqtt_packet{header  = #mqtt_packet_header{type = ?PUBLISH}, 
+validate_packet(#mqtt_packet{header   = #mqtt_packet_header{type = ?PUBLISH}, 
                              variable = #mqtt_packet_publish{topic_name = Topic}}) ->
 	case emqttd_topic:validate({name, Topic}) of
-	true -> ok;
-	false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
+        true  -> ok;
+        false -> lager:warning("Error publish topic: ~p", [Topic]), {error, badtopic}
 	end;
 
-validate_packet(#mqtt_packet{header  = #mqtt_packet_header{type = ?SUBSCRIBE},
+validate_packet(#mqtt_packet{header   = #mqtt_packet_header{type = ?SUBSCRIBE},
                              variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
 
     validate_topics(filter, Topics);
 
-validate_packet(#mqtt_packet{ header  = #mqtt_packet_header{type = ?UNSUBSCRIBE}, 
-                              variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
+validate_packet(#mqtt_packet{header   = #mqtt_packet_header{type = ?UNSUBSCRIBE}, 
+                             variable = #mqtt_packet_subscribe{topic_table = Topics}}) ->
 
     validate_topics(filter, Topics);
 
@@ -406,13 +415,16 @@ validate_topics(Type, Topics) when Type =:= name orelse Type =:= filter ->
 	ErrTopics = [Topic || {Topic, Qos} <- Topics,
 						not (emqttd_topic:validate({Type, Topic}) and validate_qos(Qos))],
 	case ErrTopics of
-	[] -> ok;
-	_ -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
+        [] -> ok;
+        _  -> lager:error("Error Topics: ~p", [ErrTopics]), {error, badtopic}
 	end.
 
-validate_qos(undefined) -> true;
-validate_qos(Qos) when Qos =< ?QOS_2 -> true;
-validate_qos(_) -> false.
+validate_qos(undefined) ->
+    true;
+validate_qos(Qos) when ?IS_QOS(Qos) ->
+    true;
+validate_qos(_) ->
+    false.
 
 %% publish ACL is cached in process dictionary.
 check_acl(publish, Topic, State) ->
@@ -428,20 +440,3 @@ check_acl(publish, Topic, State) ->
 check_acl(subscribe, Topic, State) ->
     emqttd_access_control:check_acl(client(State), subscribe, Topic).
 
-sent_stats(?PACKET(Type)) ->
-    emqttd_metrics:inc('packets/sent'), 
-    inc(Type).
-inc(?CONNACK) ->
-    emqttd_metrics:inc('packets/connack');
-inc(?PUBLISH) ->
-    emqttd_metrics:inc('messages/sent'),
-    emqttd_metrics:inc('packets/publish/sent');
-inc(?SUBACK) ->
-    emqttd_metrics:inc('packets/suback');
-inc(?UNSUBACK) ->
-    emqttd_metrics:inc('packets/unsuback');
-inc(?PINGRESP) ->
-    emqttd_metrics:inc('packets/pingresp');
-inc(_) ->
-    ingore.
-

+ 1 - 1
src/emqttd_sm_helper.erl

@@ -54,7 +54,7 @@ start_link() ->
 
 init([]) ->
     mnesia:subscribe(system),
-    {ok, TRef} = timer:send_interval(1000, tick),
+    {ok, TRef} = timer:send_interval(timer:seconds(1), tick),
     StatsFun = emqttd_stats:statsfun('sessions/count', 'sessions/max'),
     {ok, #state{stats_fun = StatsFun, tick_tref = TRef}}.
 

+ 5 - 5
src/emqttd_sysmon.erl

@@ -36,7 +36,7 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
          terminate/2, code_change/3]).
 
--record(state, {tref, events = []}).
+-record(state, {tick_tref, events = []}).
 
 %%------------------------------------------------------------------------------
 %% @doc Start system monitor
@@ -53,8 +53,8 @@ start_link(Opts) ->
 
 init([Opts]) ->
     erlang:system_monitor(self(), parse_opt(Opts)),
-    {ok, TRef} = timer:send_interval(1000, reset),
-    {ok, #state{tref = TRef}}.
+    {ok, TRef} = timer:send_interval(timer:seconds(1), reset),
+    {ok, #state{tick_tref = TRef}}.
 
 parse_opt(Opts) ->
     parse_opt(Opts, []).
@@ -134,8 +134,8 @@ handle_info(Info, State) ->
     lager:error("Unexpected info: ~p", [Info]),
     {noreply, State}.
 
-terminate(_Reason, #state{tref = TRef}) ->
-    timer:cancel(TRef), ok.
+terminate(_Reason, #state{tick_tref = TRef}) ->
+    timer:cancel(TRef).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.

+ 1 - 1
src/emqttd_topic.erl

@@ -20,7 +20,7 @@
 %%% SOFTWARE.
 %%%-----------------------------------------------------------------------------
 %%% @doc
-%%% MQTT Topic
+%%% MQTT Topic Functions
 %%%
 %%% @end
 %%%-----------------------------------------------------------------------------

+ 30 - 29
src/emqttd_ws_client.erl

@@ -101,18 +101,18 @@ ws_loop(Data, State = #wsocket_state{request    = Req,
     Peer = Req:get(peer),
     lager:debug("RECV from ~s(WebSocket): ~p", [Peer, Data]),
     case Parser(iolist_to_binary(Data)) of
-    {more, NewParser} ->
-        State#wsocket_state{parser = NewParser};
-    {ok, Packet, Rest} ->
-        gen_server:cast(ClientPid, {received, Packet}),
-        ws_loop(Rest, reset_parser(State), ReplyChannel);
-    {error, Error} ->
-        lager:error("MQTT(WebSocket) detected framing error ~p for connection ~s", [Error, Peer]),
-        exit({shutdown, Error})
+        {more, NewParser} ->
+            State#wsocket_state{parser = NewParser};
+        {ok, Packet, Rest} ->
+            gen_server:cast(ClientPid, {received, Packet}),
+            ws_loop(Rest, reset_parser(State), ReplyChannel);
+        {error, Error} ->
+            lager:error("MQTT(WebSocket) frame error ~p for connection ~s", [Error, Peer]),
+            exit({shutdown, Error})
     end.
 
 reset_parser(State = #wsocket_state{packet_opts = PktOpts}) ->
-    State#wsocket_state{parser = emqttd_parser:new (PktOpts)}.
+    State#wsocket_state{parser = emqttd_parser:new(PktOpts)}.
 
 %%%=============================================================================
 %%% gen_fsm callbacks
@@ -124,7 +124,8 @@ init([WsPid, Req, ReplyChannel, PktOpts]) ->
     SendFun = fun(Payload) -> ReplyChannel({binary, Payload}) end,
     Headers = mochiweb_request:get(headers, Req),
     HeadersList = mochiweb_headers:to_list(Headers),
-    ProtoState = emqttd_protocol:init(Peername, SendFun, [{ws_initial_headers, HeadersList}|PktOpts]),
+    ProtoState = emqttd_protocol:init(Peername, SendFun,
+                                      [{ws_initial_headers, HeadersList}|PktOpts]),
     {ok, #client_state{ws_pid = WsPid, request = Req, proto_state = ProtoState}}.
 
 handle_call(session, _From, State = #client_state{proto_state = ProtoState}) ->
@@ -149,15 +150,15 @@ handle_cast({unsubscribe, Topics}, State) ->
 
 handle_cast({received, Packet}, State = #client_state{proto_state = ProtoState}) ->
     case emqttd_protocol:received(Packet, ProtoState) of
-    {ok, ProtoState1} ->
-        noreply(State#client_state{proto_state = ProtoState1});
-    {error, Error} ->
-        lager:error("MQTT protocol error ~p", [Error]),
-        stop({shutdown, Error}, State);
-    {error, Error, ProtoState1} ->
-        stop({shutdown, Error}, State#client_state{proto_state = ProtoState1});
-    {stop, Reason, ProtoState1} ->
-        stop(Reason, State#client_state{proto_state = ProtoState1})
+        {ok, ProtoState1} ->
+            noreply(State#client_state{proto_state = ProtoState1});
+        {error, Error} ->
+            lager:error("MQTT protocol error ~p", [Error]),
+            stop({shutdown, Error}, State);
+        {error, Error, ProtoState1} ->
+            stop({shutdown, Error}, State#client_state{proto_state = ProtoState1});
+        {stop, Reason, ProtoState1} ->
+            stop(Reason, State#client_state{proto_state = ProtoState1})
     end;
 
 handle_cast(_Msg, State) ->
@@ -189,18 +190,18 @@ handle_info({keepalive, start, TimeoutSec}, State = #client_state{request = Req}
 
 handle_info({keepalive, check}, State = #client_state{request = Req, keepalive = KeepAlive}) ->
     case emqttd_keepalive:check(KeepAlive) of
-    {ok, KeepAlive1} ->
-        lager:debug("Client(WebSocket) ~s: Keepalive Resumed", [Req:get(peer)]),
-        noreply(State#client_state{keepalive = KeepAlive1});
-    {error, timeout} ->
-        lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]),
-        stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined});
-    {error, Error} ->
-        lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]),
-        stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined})
+        {ok, KeepAlive1} ->
+            noreply(State#client_state{keepalive = KeepAlive1});
+        {error, timeout} ->
+            lager:debug("Client(WebSocket) ~s: Keepalive Timeout!", [Req:get(peer)]),
+            stop({shutdown, keepalive_timeout}, State#client_state{keepalive = undefined});
+        {error, Error} ->
+            lager:debug("Client(WebSocket) ~s: Keepalive Error: ~p", [Req:get(peer), Error]),
+            stop({shutdown, keepalive_error}, State#client_state{keepalive = undefined})
     end;
 
-handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid, proto_state = ProtoState}) ->
+handle_info({'EXIT', WsPid, Reason}, State = #client_state{ws_pid = WsPid,
+                                                           proto_state = ProtoState}) ->
     ClientId = emqttd_protocol:clientid(ProtoState),
     lager:warning("Websocket client ~s exit: reason=~p", [ClientId, Reason]),
     stop({shutdown, websocket_closed}, State);