Просмотр исходного кода

Merge pull request #1777 from emqtt/emqx30-dialyzer

Add banned support, and fix dialyzer errors
Feng Lee 7 лет назад
Родитель
Сommit
ec7b39f3fd

+ 5 - 0
etc/emqx.conf

@@ -529,6 +529,11 @@ zone.external.idle_timeout = 15s
 ## Default: 10 messages per second, and 100 messages burst.
 ## zone.external.publish_limit = 10,100
 
+## Enable ban check.
+##
+## Value: Flag
+zone.external.enable_ban = on
+
 ## Enable ACL check.
 ##
 ## Value: Flag

+ 6 - 0
priv/emqx.schema

@@ -676,6 +676,12 @@ end}.
   {datatype, {enum, [allow, deny]}}
 ]}.
 
+%% @doc Enable Ban.
+{mapping, "zone.$name.enable_ban", "emqx.zones", [
+  {default, off},
+  {datatype, flag}
+]}.
+
 %% @doc Enable ACL check.
 {mapping, "zone.$name.enable_acl", "emqx.zones", [
   {default, off},

+ 28 - 38
src/emqx.erl

@@ -66,41 +66,36 @@ is_running(Node) ->
 %% PubSub API
 %%--------------------------------------------------------------------
 
--spec(subscribe(emqx_topic:topic() | string()) -> ok | {error, term()}).
+-spec(subscribe(emqx_topic:topic() | string()) -> ok).
 subscribe(Topic) ->
     emqx_broker:subscribe(iolist_to_binary(Topic)).
 
--spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string())
-      -> ok | {error, term()}).
-subscribe(Topic, Sub) when is_list(Sub)->
-    emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub));
-subscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
-    {SubPid, SubId} = Subscriber,
-    emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId).
-
--spec(subscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string(),
-                emqx_topic:subopts()) -> ok | {error, term()}).
-subscribe(Topic, Sub, Options) when is_list(Sub)->
-    emqx_broker:subscribe(iolist_to_binary(Topic), list_to_subid(Sub), Options);
-subscribe(Topic, Subscriber, Options) when is_tuple(Subscriber)->
-    {SubPid, SubId} = Subscriber,
-    emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, SubId, Options).
+-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok).
+subscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId)->
+    emqx_broker:subscribe(iolist_to_binary(Topic), SubId);
+subscribe(Topic, SubPid) when is_pid(SubPid) ->
+    emqx_broker:subscribe(iolist_to_binary(Topic), SubPid).
+
+-spec(subscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid(),
+                emqx_types:subopts()) -> ok).
+subscribe(Topic, SubId, Options) when is_atom(SubId); is_binary(SubId)->
+    emqx_broker:subscribe(iolist_to_binary(Topic), SubId, Options);
+subscribe(Topic, SubPid, Options) when is_pid(SubPid)->
+    emqx_broker:subscribe(iolist_to_binary(Topic), SubPid, Options).
 
 -spec(publish(emqx_types:message()) -> {ok, emqx_types:deliver_results()}).
 publish(Msg) ->
     emqx_broker:publish(Msg).
 
--spec(unsubscribe(emqx_topic:topic() | string()) -> ok | {error, term()}).
+-spec(unsubscribe(emqx_topic:topic() | string()) -> ok).
 unsubscribe(Topic) ->
     emqx_broker:unsubscribe(iolist_to_binary(Topic)).
 
--spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subscriber() | string())
-      -> ok | {error, term()}).
-unsubscribe(Topic, Sub) when is_list(Sub) ->
-    emqx_broker:unsubscribe(iolist_to_binary(Topic), list_to_subid(Sub));
-unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
-    {SubPid, SubId} = Subscriber,
-    emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid, SubId).
+-spec(unsubscribe(emqx_topic:topic() | string(), emqx_types:subid() | pid()) -> ok).
+unsubscribe(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
+    emqx_broker:unsubscribe(iolist_to_binary(Topic), SubId);
+unsubscribe(Topic, SubPid) when is_pid(SubPid) ->
+    emqx_broker:unsubscribe(iolist_to_binary(Topic), SubPid).
 
 %%--------------------------------------------------------------------
 %% PubSub management API
@@ -109,12 +104,12 @@ unsubscribe(Topic, Subscriber) when is_tuple(Subscriber) ->
 -spec(get_subopts(emqx_topic:topic() | string(), emqx_types:subscriber())
       -> emqx_types:subopts()).
 get_subopts(Topic, Subscriber) ->
-    emqx_broker:get_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber)).
+    emqx_broker:get_subopts(iolist_to_binary(Topic), Subscriber).
 
 -spec(set_subopts(emqx_topic:topic() | string(), emqx_types:subscriber(),
-                  emqx_types:subopts()) -> ok).
-set_subopts(Topic, Subscriber, Options) when is_list(Options) ->
-    emqx_broker:set_subopts(iolist_to_binary(Topic), list_to_subid(Subscriber), Options).
+                  emqx_types:subopts()) -> boolean()).
+set_subopts(Topic, Subscriber, Options) when is_map(Options) ->
+    emqx_broker:set_subopts(iolist_to_binary(Topic), Subscriber, Options).
 
 -spec(topics() -> list(emqx_topic:topic())).
 topics() -> emqx_router:topics().
@@ -127,16 +122,11 @@ subscribers(Topic) ->
 subscriptions(Subscriber) ->
     emqx_broker:subscriptions(Subscriber).
 
--spec(subscribed(emqx_topic:topic() | string(), emqx_types:subscriber()) -> boolean()).
-subscribed(Topic, Subscriber) ->
-    emqx_broker:subscribed(iolist_to_binary(Topic), list_to_subid(Subscriber)).
-
-list_to_subid(SubId) when is_binary(SubId) ->
-    SubId;
-list_to_subid(SubId) when is_list(SubId) ->
-    iolist_to_binary(SubId);
-list_to_subid(SubPid) when is_pid(SubPid) ->
-    SubPid.
+-spec(subscribed(emqx_topic:topic() | string(), pid() | emqx_types:subid()) -> boolean()).
+subscribed(Topic, SubPid) when is_pid(SubPid) ->
+    emqx_broker:subscribed(iolist_to_binary(Topic), SubPid);
+subscribed(Topic, SubId) when is_atom(SubId); is_binary(SubId) ->
+    emqx_broker:subscribed(iolist_to_binary(Topic), SubId).
 
 %%--------------------------------------------------------------------
 %% Hooks API

+ 2 - 3
src/emqx_access_control.erl

@@ -153,9 +153,8 @@ init([]) ->
 
 handle_call({register_mod, Type, Mod, Opts, Seq}, _From, State) ->
     Mods = lookup_mods(Type),
-    reply(case lists:keyfind(Mod, 1, Mods) of
-              {_, _, _} ->
-                  {error, already_existed};
+    reply(case lists:keymember(Mod, 1, Mods) of
+              true  -> {error, already_existed};
               false ->
                   case catch Mod:init(Opts) of
                       {ok, ModState} ->

+ 3 - 1
src/emqx_acl_internal.erl

@@ -25,6 +25,8 @@
 
 -define(ACL_RULE_TAB, emqx_acl_rule).
 
+-type(state() :: #{acl_file := string()}).
+
 %%------------------------------------------------------------------------------
 %% API
 %%------------------------------------------------------------------------------
@@ -95,7 +97,7 @@ match(Credentials, Topic, [Rule|Rules]) ->
             {matched, AllowDeny}
     end.
 
--spec(reload_acl(#{}) -> ok | {error, term()}).
+-spec(reload_acl(state()) -> ok | {error, term()}).
 reload_acl(#{acl_file := AclFile}) ->
     case catch load_rules_from_file(AclFile) of
         true ->

+ 16 - 15
src/emqx_alarm_mgr.erl

@@ -28,10 +28,11 @@
 
 -define(ALARM_MGR, ?MODULE).
 
--record(state, {alarms}).
-
 start_link() ->
-    start_with(fun(Pid) -> gen_event:add_handler(Pid, ?MODULE, []) end).
+    start_with(
+      fun(Pid) ->
+          gen_event:add_handler(Pid, ?MODULE, [])
+      end).
 
 start_with(Fun) ->
     case gen_event:start_link({local, ?ALARM_MGR}) of
@@ -73,42 +74,42 @@ delete_alarm_handler(Module) when is_atom(Module) ->
 %% Default Alarm handler
 %%------------------------------------------------------------------------------
 
-init(_) -> {ok, #state{alarms = []}}.
+init(_) -> {ok, #{alarms => []}}.
 
 handle_event({set_alarm, Alarm = #alarm{timestamp = undefined}}, State)->
     handle_event({set_alarm, Alarm#alarm{timestamp = os:timestamp()}}, State);
 
-handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #state{alarms = Alarms}) ->
+handle_event({set_alarm, Alarm = #alarm{id = AlarmId}}, State = #{alarms := Alarms}) ->
     case encode_alarm(Alarm) of
         {ok, Json} ->
             emqx_broker:safe_publish(alarm_msg(alert, AlarmId, Json));
         {error, Reason} ->
             emqx_logger:error("[AlarmMgr] Failed to encode alarm: ~p", [Reason])
     end,
-    {ok, State#state{alarms = [Alarm|Alarms]}};
+    {ok, State#{alarms := [Alarm|Alarms]}};
 
-handle_event({clear_alarm, AlarmId}, State = #state{alarms = Alarms}) ->
-    case emqx_json:safe_encode([{id, AlarmId}, {ts, emqx_time:now_secs()}]) of
+handle_event({clear_alarm, AlarmId}, State = #{alarms := Alarms}) ->
+    case emqx_json:safe_encode([{id, AlarmId}, {ts, os:system_time(second)}]) of
         {ok, Json} ->
             emqx_broker:safe_publish(alarm_msg(clear, AlarmId, Json));
         {error, Reason} ->
             emqx_logger:error("[AlarmMgr] Failed to encode clear: ~p", [Reason])
     end,
-    {ok, State#state{alarms = lists:keydelete(AlarmId, 2, Alarms)}, hibernate};
+    {ok, State#{alarms := lists:keydelete(AlarmId, 2, Alarms)}, hibernate};
 
 handle_event(Event, State)->
-    error_logger:error("[AlarmMgr] unexpected event: ~p", [Event]),
+    emqx_logger:error("[AlarmMgr] unexpected event: ~p", [Event]),
     {ok, State}.
 
 handle_info(Info, State) ->
-    error_logger:error("[AlarmMgr] unexpected info: ~p", [Info]),
+    emqx_logger:error("[AlarmMgr] unexpected info: ~p", [Info]),
     {ok, State}.
 
-handle_call(get_alarms, State = #state{alarms = Alarms}) ->
+handle_call(get_alarms, State = #{alarms := Alarms}) ->
     {ok, Alarms, State};
 
 handle_call(Req, State) ->
-    error_logger:error("[AlarmMgr] unexpected call: ~p", [Req]),
+    emqx_logger:error("[AlarmMgr] unexpected call: ~p", [Req]),
     {ok, ignored, State}.
 
 terminate(swap, State) ->
@@ -132,8 +133,8 @@ encode_alarm(#alarm{id = AlarmId, severity = Severity, title = Title,
 
 alarm_msg(Type, AlarmId, Json) ->
     Msg = emqx_message:make(?ALARM_MGR, topic(Type, AlarmId), Json),
-    emqx_message:set_headers(#{'Content-Type' => <<"application/json">>},
-                             emqx_message:set_flags(#{sys => true}, Msg)).
+    emqx_message:set_headers( #{'Content-Type' => <<"application/json">>},
+                              emqx_message:set_flag(sys, Msg)).
 
 topic(alert, AlarmId) ->
     emqx_topic:systop(<<"alarms/", AlarmId/binary, "/alert">>);

+ 20 - 27
src/emqx_banned.erl

@@ -24,27 +24,23 @@
 -boot_mnesia({mnesia, [boot]}).
 -copy_mnesia({mnesia, [copy]}).
 
-%% API
 -export([start_link/0]).
 -export([check/1]).
 -export([add/1, del/1]).
 
-%% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
 
 -define(TAB, ?MODULE).
 -define(SERVER, ?MODULE).
 
--record(state, {expiry_timer}).
-
 %%--------------------------------------------------------------------
 %% Mnesia bootstrap
 %%--------------------------------------------------------------------
 
 mnesia(boot) ->
     ok = ekka_mnesia:create_table(?TAB, [
-                {type, ordered_set},
+                {type, set},
                 {disc_copies, [node()]},
                 {record_name, banned},
                 {attributes, record_info(fields, banned)}]);
@@ -52,11 +48,7 @@ mnesia(boot) ->
 mnesia(copy) ->
     ok = ekka_mnesia:copy_table(?TAB).
 
-%%--------------------------------------------------------------------
-%% API
-%%--------------------------------------------------------------------
-
-%% @doc Start the banned server
+%% @doc Start the banned server.
 -spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
@@ -67,9 +59,13 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -
         orelse ets:member(?TAB, {username, Username})
             orelse ets:member(?TAB, {ipaddr, IPAddr}).
 
-add(Record) when is_record(Record, banned) ->
-    mnesia:dirty_write(?TAB, Record).
+-spec(add(#banned{}) -> ok).
+add(Banned) when is_record(Banned, banned) ->
+    mnesia:dirty_write(?TAB, Banned).
 
+-spec(del({client_id, emqx_types:client_id()} |
+          {username, emqx_types:username()} |
+          {peername, emqx_types:peername()}) -> ok).
 del(Key) ->
     mnesia:dirty_delete(?TAB, Key).
 
@@ -78,27 +74,26 @@ del(Key) ->
 %%--------------------------------------------------------------------
 
 init([]) ->
-    emqx_time:seed(),
-    {ok, ensure_expiry_timer(#state{})}.
+    {ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
 
 handle_call(Req, _From, State) ->
-    emqx_logger:error("[BANNED] Unexpected request: ~p", [Req]),
-    {reply, ignore, State}.
+    emqx_logger:error("[BANNED] unexpected call: ~p", [Req]),
+    {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    emqx_logger:error("[BANNED] Unexpected msg: ~p", [Msg]),
+    emqx_logger:error("[BANNED] unexpected msg: ~p", [Msg]),
     {noreply, State}.
 
-handle_info({timeout, Ref, expire}, State = #state{expiry_timer = Ref}) ->
+handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
     mnesia:async_dirty(fun expire_banned_items/1, [erlang:timestamp()]),
     {noreply, ensure_expiry_timer(State), hibernate};
 
 handle_info(Info, State) ->
-    emqx_logger:error("[BANNED] Unexpected info: ~p", [Info]),
+    emqx_logger:error("[BANNED] unexpected info: ~p", [Info]),
     {noreply, State}.
 
-terminate(_Reason, #state{expiry_timer = Timer}) ->
-    emqx_misc:cancel_timer(Timer).
+terminate(_Reason, #{expiry_timer := TRef}) ->
+    emqx_misc:cancel_timer(TRef).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -108,9 +103,7 @@ code_change(_OldVsn, State, _Extra) ->
 %%--------------------------------------------------------------------
 
 ensure_expiry_timer(State) ->
-    Interval = emqx_config:get_env(banned_expiry_interval, timer:minutes(5)),
-    State#state{expiry_timer = emqx_misc:start_timer(
-                                 Interval + rand:uniform(Interval), expire)}.
+    State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
 
 expire_banned_items(Now) ->
     expire_banned_item(mnesia:first(?TAB), Now).
@@ -119,11 +112,11 @@ expire_banned_item('$end_of_table', _Now) ->
     ok;
 expire_banned_item(Key, Now) ->
     case mnesia:read(?TAB, Key) of
-        [#banned{until = undefined}] -> ok;
+        [#banned{until = undefined}] ->
+            ok;
         [B = #banned{until = Until}] when Until < Now ->
            mnesia:delete_object(?TAB, B, sticky_write);
-        [_] -> ok;
-        [] -> ok
+        _ -> ok
     end,
     expire_banned_item(mnesia:next(?TAB, Key), Now).
 

+ 3 - 2
src/emqx_bridge.erl

@@ -133,7 +133,8 @@ handle_info(start, State = #state{options = Options,
             Subs = get_value(subscriptions, Options, []),
             [emqx_client:subscribe(ClientPid, {i2b(Topic), Qos}) || {Topic, Qos} <- Subs],
             ForwardRules = string:tokens(get_value(forward_rule, Options, ""), ","),
-            [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules, emqx_topic:validate({filter, i2b(Topic)})],
+            [emqx_broker:subscribe(i2b(Topic)) || Topic <- ForwardRules,
+                                                  emqx_topic:validate({filter, i2b(Topic)})],
             {noreply, State#state{client_pid = ClientPid}};
         {error,_} ->
             erlang:send_after(ReconnectTime, self(), start),
@@ -251,4 +252,4 @@ store(disk, Data, Queue, _MaxPendingMsg)->
 delete(memory, PkgId, Queue) ->
     lists:keydelete(PkgId, 1, Queue);
 delete(disk, PkgId, Queue) ->
-    lists:keydelete(PkgId, 1, Queue).
+    lists:keydelete(PkgId, 1, Queue).

+ 2 - 2
src/emqx_broker.erl

@@ -260,9 +260,9 @@ subscription(Topic, Subscriber) ->
 
 -spec(subscribed(emqx_topic:topic(), pid() | emqx_types:subid() | emqx_types:subscriber()) -> boolean()).
 subscribed(Topic, SubPid) when is_binary(Topic), is_pid(SubPid) ->
-    length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) == 1;
+    length(ets:match_object(?SUBOPTION, {{Topic, {SubPid, '_'}}, '_'}, 1)) >= 1;
 subscribed(Topic, SubId) when is_binary(Topic), ?is_subid(SubId) ->
-    length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) == 1;
+    length(ets:match_object(?SUBOPTION, {{Topic, {'_', SubId}}, '_'}, 1)) >= 1;
 subscribed(Topic, {SubPid, SubId}) when is_binary(Topic), is_pid(SubPid), ?is_subid(SubId) ->
     ets:member(?SUBOPTION, {Topic, {SubPid, SubId}}).
 

+ 1 - 1
src/emqx_client.erl

@@ -186,7 +186,7 @@ with_owner(Options) ->
 connect(Client) ->
     gen_statem:call(Client, connect, infinity).
 
--spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]})
+-spec(subscribe(client(), topic() | {topic(), qos() | [subopt()]} | [{topic(), qos()}])
       -> subscribe_ret()).
 subscribe(Client, Topic) when is_binary(Topic) ->
     subscribe(Client, {Topic, ?QOS_0});

+ 13 - 7
src/emqx_cm_sup.erl

@@ -25,11 +25,17 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    {ok, {{one_for_all, 10, 3600},
-          [#{id       => manager,
-             start    => {emqx_cm, start_link, []},
-             restart  => permanent,
-             shutdown => 5000,
-             type     => worker,
-             modules  => [emqx_cm]}]}}.
+    Banned = #{id       => banned,
+               start    => {emqx_banned, start_link, []},
+               restart  => permanent,
+               shutdown => 5000,
+               type     => worker,
+               modules  => [emqx_banned]},
+    Manager = #{id       => manager,
+                start    => {emqx_cm, start_link, []},
+                restart  => permanent,
+                shutdown => 5000,
+                type     => worker,
+                modules  => [emqx_cm]},
+    {ok, {{one_for_one, 10, 100}, [Banned, Manager]}}.
 

+ 15 - 16
src/emqx_connection.erl

@@ -202,20 +202,19 @@ handle_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
         {ok, ProtoState1} ->
             {noreply, maybe_gc(ensure_stats_timer(State#state{proto_state = ProtoState1}))};
         {error, Reason} ->
-            shutdown(Reason, State);
-        {error, Reason, ProtoState1} ->
-            shutdown(Reason, State#state{proto_state = ProtoState1})
+            shutdown(Reason, State)
     end;
 
-handle_info(emit_stats, State = #state{proto_state = ProtoState}) ->
+handle_info({timeout, Timer, emit_stats},
+            State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
     emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
     {noreply, State#state{stats_timer = undefined}, hibernate};
 
 handle_info(timeout, State) ->
     shutdown(idle_timeout, State);
 
-handle_info({shutdown, Error}, State) ->
-    shutdown(Error, State);
+handle_info({shutdown, Reason}, State) ->
+    shutdown(Reason, State);
 
 handle_info({shutdown, discard, {ClientId, ByPid}}, State) ->
     ?LOG(warning, "discarded by ~s:~p", [ClientId, ByPid], State),
@@ -311,13 +310,13 @@ handle_packet(Data, State = #state{proto_state  = ProtoState,
                 {ok, ProtoState1} ->
                     NewState = State#state{proto_state = ProtoState1},
                     handle_packet(Rest, inc_publish_cnt(Type, reset_parser(NewState)));
-                {error, Error} ->
-                    ?LOG(error, "Protocol error - ~p", [Error], State),
-                    shutdown(Error, State);
-                {error, Error, ProtoState1} ->
-                    shutdown(Error, State#state{proto_state = ProtoState1});
-                {stop, Reason, ProtoState1} ->
-                    stop(Reason, State#state{proto_state = ProtoState1})
+                {error, Reason} ->
+                    ?LOG(error, "Process packet error - ~p", [Reason], State),
+                    shutdown(Reason, State);
+                {error, Reason, ProtoState1} ->
+                    shutdown(Reason, State#state{proto_state = ProtoState1});
+                {stop, Error, ProtoState1} ->
+                    stop(Error, State#state{proto_state = ProtoState1})
             end;
         {error, Error} ->
             ?LOG(error, "Framing error - ~p", [Error], State),
@@ -371,9 +370,9 @@ run_socket(State = #state{transport = Transport, socket = Socket}) ->
 %%------------------------------------------------------------------------------
 
 ensure_stats_timer(State = #state{enable_stats = true,
-                                   stats_timer  = undefined,
-                                   idle_timeout = IdleTimeout}) ->
-    State#state{stats_timer = erlang:send_after(IdleTimeout, self(), emit_stats)};
+                                  stats_timer  = undefined,
+                                  idle_timeout = IdleTimeout}) ->
+    State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
 ensure_stats_timer(State) -> State.
 
 shutdown(Reason, State) ->

+ 4 - 10
src/emqx_local_bridge.erl

@@ -60,8 +60,8 @@ init([Pool, Id, Node, Topic, Options]) ->
     case net_kernel:connect_node(Node) of
         true ->
             true = erlang:monitor_node(Node, true),
-            Share = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
-            emqx_broker:subscribe(Topic, self(), [{share, Share}, {qos, ?QOS_0}]),
+            Group = iolist_to_binary(["$bridge:", atom_to_list(Node), ":", Topic]),
+            emqx_broker:subscribe(Topic, self(), #{share => Group, qos => ?QOS_0}),
             State = parse_opts(Options, #state{node = Node, subtopic = Topic}),
             MQueue = emqx_mqueue:init(#{type => simple,
                                         max_len => State#state.max_queue_len,
@@ -86,11 +86,6 @@ parse_opts([{ping_down_interval, Interval} | Opts], State) ->
 parse_opts([_Opt | Opts], State) ->
     parse_opts(Opts, State).
 
-qname(Node, Topic) when is_atom(Node) ->
-    qname(atom_to_list(Node), Topic);
-qname(Node, Topic) ->
-    iolist_to_binary(["Bridge:", Node, ":", Topic]).
-
 handle_call(Req, _From, State) ->
     emqx_logger:error("[Bridge] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
@@ -104,7 +99,7 @@ handle_info({dispatch, _Topic, Msg}, State = #state{mqueue = Q, status = down})
     {noreply, State#state{mqueue = emqx_mqueue:in(Msg, Q)}};
 
 handle_info({dispatch, _Topic, Msg}, State = #state{node = Node, status = up}) ->
-    ok = emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]),
+    emqx_rpc:cast(Node, emqx_broker, publish, [transform(Msg, State)]),
     {noreply, State};
 
 handle_info({nodedown, Node}, State = #state{node = Node, ping_down_interval = Interval}) ->
@@ -157,7 +152,6 @@ dequeue(State = #state{mqueue = MQ}) ->
             dequeue(State#state{mqueue = MQ1})
     end.
 
-transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix,
-                                                topic_suffix = Suffix}) ->
+transform(Msg = #message{topic = Topic}, #state{topic_prefix = Prefix, topic_suffix = Suffix}) ->
     Msg#message{topic = <<Prefix/binary, Topic/binary, Suffix/binary>>}.
 

+ 4 - 6
src/emqx_metrics.erl

@@ -26,8 +26,6 @@
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
 
--record(state, {}).
-
 %% Bytes sent and received of Broker
 -define(BYTES_METRICS, [
     {counter, 'bytes/received'}, % Total bytes received
@@ -85,8 +83,8 @@
 -define(TAB, ?MODULE).
 -define(SERVER, ?MODULE).
 
-%% @doc Start the metrics server
--spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
+%% @doc Start the metrics server.
+-spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
 
@@ -252,7 +250,7 @@ init([]) ->
     % Create metrics table
     _ = emqx_tables:new(?TAB, [set, public, {write_concurrency, true}]),
     lists:foreach(fun new/1, ?BYTES_METRICS ++ ?PACKET_METRICS ++ ?MESSAGE_METRICS),
-    {ok, #state{}, hibernate}.
+    {ok, #{}, hibernate}.
 
 handle_call(Req, _From, State) ->
     emqx_logger:error("[Metrics] unexpected call: ~p", [Req]),
@@ -266,7 +264,7 @@ handle_info(Info, State) ->
     emqx_logger:error("[Metrics] unexpected info: ~p", [Info]),
     {noreply, State}.
 
-terminate(_Reason, #state{}) ->
+terminate(_Reason, #{}) ->
     ok.
 
 code_change(_OldVsn, State, _Extra) ->

+ 2 - 2
src/emqx_mod_rewrite.erl

@@ -23,8 +23,8 @@
 
 load(Rules0) ->
     Rules = compile(Rules0),
-    emqx_hooks:add('client.subscribe',  fun ?MODULE:rewrite_subscribe/4, [Rules]),
-    emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/4, [Rules]),
+    emqx_hooks:add('client.subscribe',  fun ?MODULE:rewrite_subscribe/3, [Rules]),
+    emqx_hooks:add('client.unsubscribe',fun ?MODULE:rewrite_unsubscribe/3, [Rules]),
     emqx_hooks:add('message.publish',   fun ?MODULE:rewrite_publish/2, [Rules]).
 
 rewrite_subscribe(_Credentials, TopicTable, Rules) ->

+ 19 - 4
src/emqx_protocol.erl

@@ -56,6 +56,7 @@
           mountpoint,
           is_super,
           is_bridge,
+          enable_ban,
           enable_acl,
           recv_stats,
           send_stats,
@@ -97,6 +98,7 @@ init(#{peername := Peername, peercert := Peercert, sendfun := SendFun}, Options)
             packet_size  = emqx_zone:get_env(Zone, max_packet_size),
             mountpoint   = emqx_zone:get_env(Zone, mountpoint),
             is_bridge    = false,
+            enable_ban   = emqx_zone:get_env(Zone, enable_ban, false),
             enable_acl   = emqx_zone:get_env(Zone, enable_acl),
             recv_stats   = #{msg => 0, pkt => 0},
             send_stats   = #{msg => 0, pkt => 0},
@@ -186,14 +188,14 @@ session(#pstate{session = SPid}) ->
     SPid.
 
 parser(#pstate{packet_size = Size, proto_ver = Ver}) ->
-    emqx_frame:initial_state(#{packet_size => Size, version => Ver}).
+    emqx_frame:initial_state(#{max_packet_size => Size, version => Ver}).
 
 %%------------------------------------------------------------------------------
 %% Packet Received
 %%------------------------------------------------------------------------------
 
--spec(received(emqx_mqtt_types:packet(), state())
-      -> {ok, state()} | {error, term()} | {error, term(), state()}).
+-spec(received(emqx_mqtt_types:packet(), state()) ->
+    {ok, state()} | {error, term()} | {error, term(), state()} | {stop, term(), state()}).
 received(?PACKET(Type), PState = #pstate{connected = false}) when Type =/= ?CONNECT ->
     {error, proto_not_connected, PState};
 
@@ -449,6 +451,7 @@ puback(?QOS_2, PacketId, {ok, _}, PState) ->
 %% Deliver Packet -> Client
 %%------------------------------------------------------------------------------
 
+-spec(deliver(tuple(), state()) -> {ok, state()} | {error, term()}).
 deliver({connack, ReasonCode}, PState) ->
     send(?CONNACK_PACKET(ReasonCode), PState);
 
@@ -581,7 +584,8 @@ set_property(Name, Value, Props) ->
 
 check_connect(Packet, PState) ->
     run_check_steps([fun check_proto_ver/2,
-                     fun check_client_id/2], Packet, PState).
+                     fun check_client_id/2,
+                     fun check_banned/2], Packet, PState).
 
 check_proto_ver(#mqtt_packet_connect{proto_ver  = Ver,
                                      proto_name = Name}, _PState) ->
@@ -612,6 +616,17 @@ check_client_id(#mqtt_packet_connect{client_id = ClientId}, #pstate{zone = Zone}
         false -> {error, ?RC_CLIENT_IDENTIFIER_NOT_VALID}
     end.
 
+check_banned(_Connect, #pstate{enable_ban = false}) ->
+    ok;
+check_banned(#mqtt_packet_connect{client_id = ClientId, username = Username},
+             #pstate{peername = Peername}) ->
+    case emqx_banned:check(#{client_id => ClientId,
+                             username  => Username,
+                             peername  => Peername}) of
+        true  -> {error, ?RC_BANNED};
+        false -> ok
+    end.
+
 check_publish(Packet, PState) ->
     run_check_steps([fun check_pub_caps/2,
                      fun check_pub_acl/2], Packet, PState).

+ 4 - 1
src/emqx_session.erl

@@ -148,6 +148,9 @@
          }).
 
 -type(spid() :: pid()).
+-type(attr() :: {atom(), term()}).
+
+-export_type([attr/0]).
 
 -define(TIMEOUT, 60000).
 
@@ -564,7 +567,7 @@ handle_info({timeout, Timer, check_awaiting_rel}, State = #state{await_rel_timer
     noreply(expire_awaiting_rel(State#state{await_rel_timer = undefined}));
 
 handle_info({timeout, Timer, emit_stats}, State = #state{client_id = ClientId, stats_timer = Timer}) ->
-    true = emqx_sm:set_session_stats(ClientId, stats(State)),
+    _ = emqx_sm:set_session_stats(ClientId, stats(State)),
     {noreply, State#state{stats_timer = undefined}, hibernate};
 
 handle_info({timeout, Timer, expired}, State = #state{expiry_timer = Timer}) ->

+ 21 - 26
src/emqx_sm.erl

@@ -49,22 +49,20 @@ start_link() ->
 
 %% @doc Open a session.
 -spec(open_session(map()) -> {ok, pid()} | {ok, pid(), boolean()} | {error, term()}).
-open_session(Attrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
+open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid := ConnPid}) ->
     CleanStart = fun(_) ->
                      ok = discard_session(ClientId, ConnPid),
-                     emqx_session_sup:start_session(Attrs)
+                     emqx_session_sup:start_session(SessAttrs)
                  end,
     emqx_sm_locker:trans(ClientId, CleanStart);
 
-open_session(Attrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) ->
+open_session(SessAttrs = #{clean_start := false, client_id := ClientId, conn_pid := ConnPid}) ->
     ResumeStart = fun(_) ->
                       case resume_session(ClientId, ConnPid) of
                           {ok, SPid} ->
                               {ok, SPid, true};
                           {error, not_found} ->
-                              emqx_session_sup:start_session(Attrs);
-                          {error, Reason} ->
-                              {error, Reason}
+                              emqx_session_sup:start_session(SessAttrs)
                       end
                   end,
     emqx_sm_locker:trans(ClientId, ResumeStart).
@@ -113,31 +111,31 @@ close_session(SPid) when is_pid(SPid) ->
 
 %% @doc Register a session with attributes.
 -spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()},
-                       list(emqx_session:attribute())) -> ok).
-register_session(ClientId, Attrs) when is_binary(ClientId) ->
-    register_session({ClientId, self()}, Attrs);
+                       list(emqx_session:attr())) -> ok).
+register_session(ClientId, SessAttrs) when is_binary(ClientId) ->
+    register_session({ClientId, self()}, SessAttrs);
 
-register_session(Session = {ClientId, SPid}, Attrs)
+register_session(Session = {ClientId, SPid}, SessAttrs)
     when is_binary(ClientId), is_pid(SPid) ->
     ets:insert(?SESSION_TAB, Session),
-    ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}),
-    case proplists:get_value(clean_start, Attrs, true) of
-        true  -> ok;
-        false  -> ets:insert(?SESSION_P_TAB, Session)
-    end,
+    ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}),
+    proplists:get_value(clean_start, SessAttrs, true)
+        andalso ets:insert(?SESSION_P_TAB, Session),
     emqx_sm_registry:register_session(Session),
     notify({registered, ClientId, SPid}).
 
 %% @doc Get session attrs
--spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attribute())).
+-spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())).
 get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
     safe_lookup_element(?SESSION_ATTRS_TAB, Session, []).
 
 %% @doc Set session attrs
-set_session_attrs(ClientId, Attrs) when is_binary(ClientId) ->
-    set_session_attrs({ClientId, self()}, Attrs);
-set_session_attrs(Session = {ClientId, SPid}, Attrs) when is_binary(ClientId), is_pid(SPid) ->
-    ets:insert(?SESSION_ATTRS_TAB, {Session, Attrs}).
+-spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()},
+                        list(emqx_session:attr())) -> true).
+set_session_attrs(ClientId, SessAttrs) when is_binary(ClientId) ->
+    set_session_attrs({ClientId, self()}, SessAttrs);
+set_session_attrs(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) ->
+    ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}).
 
 %% @doc Unregister a session
 -spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok).
@@ -154,18 +152,15 @@ unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(
 
 %% @doc Get session stats
 -spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
-get_session_stats(Session = {ClientId, SPid})
-    when is_binary(ClientId), is_pid(SPid) ->
+get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
     safe_lookup_element(?SESSION_STATS_TAB, Session, []).
 
 %% @doc Set session stats
 -spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()},
-                        emqx_stats:stats()) -> ok).
+                        emqx_stats:stats()) -> true).
 set_session_stats(ClientId, Stats) when is_binary(ClientId) ->
     set_session_stats({ClientId, self()}, Stats);
-
-set_session_stats(Session = {ClientId, SPid}, Stats)
-    when is_binary(ClientId), is_pid(SPid) ->
+set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), is_pid(SPid) ->
     ets:insert(?SESSION_STATS_TAB, {Session, Stats}).
 
 %% @doc Lookup a session from registry

+ 17 - 17
src/emqx_sm_registry.erl

@@ -20,7 +20,9 @@
 
 -export([start_link/0]).
 -export([is_enabled/0]).
+
 -export([register_session/1, lookup_session/1, unregister_session/1]).
+
 %% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
@@ -30,12 +32,11 @@
 -define(LOCK, {?MODULE, cleanup_sessions}).
 
 -record(global_session, {sid, pid}).
--record(state, {}).
 
 -type(session_pid() :: pid()).
 
-%% @doc Start the session manager.
--spec(start_link() -> {ok, pid()} | ignore | {error, term()}).
+%% @doc Start the global session manager.
+-spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?REGISTRY}, ?MODULE, [], []).
 
@@ -46,19 +47,18 @@ is_enabled() ->
 -spec(lookup_session(emqx_types:client_id())
       -> list({emqx_types:client_id(), session_pid()})).
 lookup_session(ClientId) ->
-    [{ClientId, SessionPid} || #global_session{pid = SessionPid}
-                               <- mnesia:dirty_read(?TAB, ClientId)].
+    [{ClientId, SessPid} || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)].
 
 -spec(register_session({emqx_types:client_id(), session_pid()}) -> ok).
-register_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) ->
-    mnesia:dirty_write(?TAB, record(ClientId, SessionPid)).
+register_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) ->
+    mnesia:dirty_write(?TAB, record(ClientId, SessPid)).
 
 -spec(unregister_session({emqx_types:client_id(), session_pid()}) -> ok).
-unregister_session({ClientId, SessionPid}) when is_binary(ClientId), is_pid(SessionPid) ->
-    mnesia:dirty_delete_object(?TAB, record(ClientId, SessionPid)).
+unregister_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) ->
+    mnesia:dirty_delete_object(?TAB, record(ClientId, SessPid)).
 
-record(ClientId, SessionPid) ->
-    #global_session{sid = ClientId, pid = SessionPid}.
+record(ClientId, SessPid) ->
+    #global_session{sid = ClientId, pid = SessPid}.
 
 %%------------------------------------------------------------------------------
 %% gen_server callbacks
@@ -72,7 +72,7 @@ init([]) ->
                 {attributes, record_info(fields, global_session)}]),
     ok = ekka_mnesia:copy_table(?TAB),
     ekka:monitor(membership),
-    {ok, #state{}}.
+    {ok, #{}}.
 
 handle_call(Req, _From, State) ->
     emqx_logger:error("[Registry] unexpected call: ~p", [Req]),
@@ -107,9 +107,9 @@ code_change(_OldVsn, State, _Extra) ->
 %%------------------------------------------------------------------------------
 
 cleanup_sessions(Node) ->
-    Pat = [{#global_session{pid = '$1', _ = '_'},
-            [{'==', {node, '$1'}, Node}], ['$_']}],
-    lists:foreach(fun(Session) ->
-                      mnesia:delete_object(?TAB, Session)
-                  end, mnesia:select(?TAB, Pat)).
+    Pat = [{#global_session{pid = '$1', _ = '_'}, [{'==', {node, '$1'}, Node}], ['$_']}],
+    lists:foreach(fun delete_session/1, mnesia:select(?TAB, Pat, write)).
+
+delete_session(Session) ->
+    mnesia:delete_object(?TAB, Session, write).
 

+ 2 - 1
src/emqx_stats.erl

@@ -31,9 +31,10 @@
          code_change/3]).
 
 -record(update, {name, countdown, interval, func}).
--record(state, {timer, updates :: #update{}}).
+-record(state, {timer, updates :: [#update{}]}).
 
 -type(stats() :: list({atom(), non_neg_integer()})).
+
 -export_type([stats/0]).
 
 %% Connection stats

+ 75 - 56
src/emqx_ws_connection.erl

@@ -40,38 +40,61 @@
           keepalive,
           enable_stats,
           stats_timer,
-          shutdown_reason
+          shutdown
          }).
 
--define(INFO_KEYS, [peername, sockname]).
-
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
 
 -define(WSLOG(Level, Format, Args, State),
-        emqx_logger:Level("WSMQTT(~s): " ++ Format, [esockd_net:format(State#state.peername) | Args])).
+        emqx_logger:Level("MQTT/WS(~s): " ++ Format,
+                          [esockd_net:format(State#state.peername) | Args])).
 
 %%------------------------------------------------------------------------------
 %% API
 %%------------------------------------------------------------------------------
 
 %% for debug
-info(WSPid) ->
-    call(WSPid, info).
+info(WSPid) when is_pid(WSPid) ->
+    call(WSPid, info);
+
+info(#state{peername    = Peername,
+            sockname    = Sockname,
+            proto_state = ProtoState}) ->
+    ProtoInfo = emqx_protocol:info(ProtoState),
+    ConnInfo = [{socktype, websocket},
+                {conn_state, running},
+                {peername, Peername},
+                {sockname, Sockname}],
+    lists:append([ConnInfo, ProtoInfo]).
 
 %% for dashboard
-attrs(CPid) when is_pid(CPid) ->
-    call(CPid, attrs).
+attrs(WSPid) when is_pid(WSPid) ->
+    call(WSPid, attrs);
+
+attrs(#state{peername    = Peername,
+             sockname    = Sockname,
+             proto_state = ProtoState}) ->
+    SockAttrs = [{peername, Peername},
+                 {sockname, Sockname}],
+    ProtoAttrs = emqx_protocol:attrs(ProtoState),
+    lists:usort(lists:append(SockAttrs, ProtoAttrs)).
+
+stats(WSPid) when is_pid(WSPid) ->
+    call(WSPid, stats);
 
-stats(WSPid) ->
-    call(WSPid, stats).
+stats(#state{proto_state = ProtoState}) ->
+    lists:append([wsock_stats(),
+                  emqx_misc:proc_stats(),
+                  emqx_protocol:stats(ProtoState)
+                 ]).
 
-kick(WSPid) ->
+kick(WSPid) when is_pid(WSPid) ->
     call(WSPid, kick).
 
-session(WSPid) ->
+session(WSPid) when is_pid(WSPid) ->
     call(WSPid, session).
 
-call(WSPid, Req) ->
+call(WSPid, Req) when is_pid(WSPid) ->
     Mref = erlang:monitor(process, WSPid),
     WSPid ! {call, {self(), Mref}, Req},
     receive
@@ -153,41 +176,30 @@ websocket_handle({binary, Data}, State = #state{parser_state = ParserState,
                     websocket_handle({binary, Rest}, reset_parser(State#state{proto_state = ProtoState1}));
                 {error, Error} ->
                     ?WSLOG(error, "Protocol error - ~p", [Error], State),
-                    {stop, State};
-                {error, Error, ProtoState1} ->
-                    shutdown(Error, State#state{proto_state = ProtoState1});
-                {stop, Reason, ProtoState1} ->
-                    shutdown(Reason, State#state{proto_state = ProtoState1})
+                    stop(Error, State);
+                {error, Reason, ProtoState1} ->
+                    shutdown(Reason, State#state{proto_state = ProtoState1});
+                {stop, Error, ProtoState1} ->
+                    stop(Error, State#state{proto_state = ProtoState1})
             end;
         {error, Error} ->
             ?WSLOG(error, "Frame error: ~p", [Error], State),
-            {stop, State};
+            stop(Error, State);
         {'EXIT', Reason} ->
             ?WSLOG(error, "Frame error:~p~nFrame data: ~p", [Reason, Data], State),
-            {stop, State}
+            shutdown(parse_error, State)
     end.
 
-websocket_info({call, From, info}, State = #state{peername    = Peername,
-                                                  sockname    = Sockname,
-                                                  proto_state = ProtoState}) ->
-    ProtoInfo = emqx_protocol:info(ProtoState),
-    ConnInfo = [{socktype, websocket}, {conn_state, running},
-                {peername, Peername}, {sockname, Sockname}],
-    gen_server:reply(From, lists:append([ConnInfo, ProtoInfo])),
+websocket_info({call, From, info}, State) ->
+    gen_server:reply(From, info(State)),
     {ok, State};
 
-websocket_info({call, From, attrs}, State = #state{peername    = Peername,
-                                                   sockname    = Sockname,
-                                                   proto_state = ProtoState}) ->
-    SockAttrs = [{peername, Peername},
-                 {sockname, Sockname}],
-    ProtoAttrs = emqx_protocol:attrs(ProtoState),
-    gen_server:reply(From, lists:usort(lists:append(SockAttrs, ProtoAttrs))),
+websocket_info({call, From, attrs}, State) ->
+    gen_server:reply(From, attrs(State)),
     {ok, State};
 
-websocket_info({call, From, stats}, State = #state{proto_state = ProtoState}) ->
-    Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(), emqx_protocol:stats(ProtoState)]),
-    gen_server:reply(From, Stats),
+websocket_info({call, From, stats}, State) ->
+    gen_server:reply(From, stats(State)),
     {ok, State};
 
 websocket_info({call, From, kick}, State) ->
@@ -203,15 +215,12 @@ websocket_info({deliver, PubOrAck}, State = #state{proto_state = ProtoState}) ->
         {ok, ProtoState1} ->
             {ok, ensure_stats_timer(State#state{proto_state = ProtoState1})};
         {error, Reason} ->
-            shutdown(Reason, State);
-        {error, Reason, ProtoState1} ->
-            shutdown(Reason, State#state{proto_state = ProtoState1})
+            shutdown(Reason, State)
     end;
 
-websocket_info(emit_stats, State = #state{proto_state = ProtoState}) ->
-    Stats = lists:append([wsock_stats(), emqx_misc:proc_stats(),
-                          emqx_protocol:stats(ProtoState)]),
-    emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), Stats),
+websocket_info({timeout, Timer, emit_stats},
+               State = #state{stats_timer = Timer, proto_state = ProtoState}) ->
+    emqx_cm:set_conn_stats(emqx_protocol:client_id(ProtoState), stats(State)),
     {ok, State#state{stats_timer = undefined}, hibernate};
 
 websocket_info({keepalive, start, Interval}, State) ->
@@ -254,30 +263,40 @@ websocket_info(Info, State) ->
     ?WSLOG(error, "unexpected info: ~p", [Info], State),
     {ok, State}.
 
-terminate(SockError, _Req, #state{keepalive       = Keepalive,
-                                  proto_state     = ProtoState,
-                                  shutdown_reason = Reason}) ->
+terminate(SockError, _Req, State = #state{keepalive   = Keepalive,
+                                          proto_state = ProtoState,
+                                          shutdown    = Shutdown}) ->
+    ?WSLOG(debug, "Terminated for ~p, sockerror: ~p",
+           [Shutdown, SockError], State),
     emqx_keepalive:cancel(Keepalive),
-    io:format("Websocket shutdown for ~p, sockerror: ~p~n", [Reason, SockError]),
-    case Reason of
-        undefined ->
-            ok;
-        _ ->
-            emqx_protocol:shutdown(Reason, ProtoState)
+    case {ProtoState, Shutdown} of
+        {undefined, _} -> ok;
+        {_, {shutdown, Reason}} ->
+            emqx_protocol:shutdown(Reason, ProtoState);
+        {_, Error} ->
+            emqx_protocol:shutdown(Error, ProtoState)
     end.
 
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
 reset_parser(State = #state{proto_state = ProtoState}) ->
     State#state{parser_state = emqx_protocol:parser(ProtoState)}.
 
 ensure_stats_timer(State = #state{enable_stats = true,
                                   stats_timer  = undefined,
-                                  idle_timeout = Timeout}) ->
-    State#state{stats_timer = erlang:send_after(Timeout, self(), emit_stats)};
+                                  idle_timeout = IdleTimeout}) ->
+    State#state{stats_timer = emqx_misc:start_timer(IdleTimeout, emit_stats)};
 ensure_stats_timer(State) ->
     State.
 
 shutdown(Reason, State) ->
-    {stop, State#state{shutdown_reason = Reason}}.
+    {stop, State#state{shutdown = Reason}}.
+
+stop(Error, State) ->
+    {stop, State#state{shutdown = Error}}.
 
 wsock_stats() ->
     [{Key, get(Key)} || Key <- ?SOCK_STATS].
+