Просмотр исходного кода

Add the new 'emqx_limiter' module

Feng Lee 6 лет назад
Родитель
Сommit
2b1b58fc66
8 измененных файлов с 612 добавлено и 286 удалено
  1. 1 2
      src/emqx_cm.erl
  2. 224 168
      src/emqx_connection.erl
  3. 11 5
      src/emqx_gc.erl
  4. 73 0
      src/emqx_limiter.erl
  5. 79 33
      src/emqx_misc.erl
  6. 4 4
      src/emqx_pd.erl
  7. 6 0
      src/emqx_types.erl
  8. 214 74
      src/emqx_ws_connection.erl

+ 1 - 2
src/emqx_cm.erl

@@ -1,4 +1,4 @@
-%%--------------------------------------------------------------------
+%%-------------------------------------------------------------------
 %% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
@@ -93,7 +93,6 @@ start_link() ->
 %%--------------------------------------------------------------------
 
 %% @doc Register a channel.
-%% Channel will be unregistered automatically when the channel process dies
 -spec(register_channel(emqx_types:clientid()) -> ok).
 register_channel(ClientId) when is_binary(ClientId) ->
     register_channel(ClientId, self()).

+ 224 - 168
src/emqx_connection.erl

@@ -14,7 +14,7 @@
 %% limitations under the License.
 %%--------------------------------------------------------------------
 
-%% MQTT/TCP Connection
+%% MQTT/TCP|TLS Connection
 -module(emqx_connection).
 
 -include("emqx.hrl").
@@ -40,7 +40,7 @@
 
 -export([call/2]).
 
-%% callback
+%% Callback
 -export([init/4]).
 
 %% Sys callbacks
@@ -50,12 +50,15 @@
         , system_get_state/1
         ]).
 
-%% Internal callbacks
--export([wakeup_from_hib/2]).
+%% Internal callback
+-export([wakeup_from_hib/3]).
+
+-import(emqx_misc,
+        [ maybe_apply/2
+        , start_timer/2
+        ]).
 
 -record(state, {
-          %% Parent
-          parent :: pid(),
           %% TCP/TLS Transport
           transport :: esockd:transport(),
           %% TCP/TLS Socket
@@ -64,34 +67,37 @@
           peername :: emqx_types:peername(),
           %% Sockname of the connection
           sockname :: emqx_types:peername(),
-          %% Sock state
+          %% Sock State
           sockstate :: emqx_types:sockstate(),
           %% The {active, N} option
           active_n :: pos_integer(),
-          %% Publish Limit
-          pub_limit :: maybe(esockd_rate_limit:bucket()),
-          %% Rate Limit
-          rate_limit :: maybe(esockd_rate_limit:bucket()),
+          %% Limiter
+          limiter :: maybe(emqx_limiter:limiter()),
           %% Limit Timer
           limit_timer :: maybe(reference()),
-          %% Parser State
+          %% Parse State
           parse_state :: emqx_frame:parse_state(),
           %% Serialize function
           serialize :: emqx_frame:serialize_fun(),
           %% Channel State
           channel :: emqx_channel:channel(),
-          %% Idle timer
-          idle_timer :: reference()
+          %% GC State
+          gc_state :: maybe(emqx_gc:gc_state()),
+          %% Stats Timer
+          stats_timer :: disabled | maybe(reference()),
+          %% Idle Timer
+          idle_timer :: maybe(reference())
         }).
 
 -type(state() :: #state{}).
 
 -define(ACTIVE_N, 100).
--define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n,
-                    pub_limit, rate_limit]).
+-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]).
 -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
 
+-define(ENABLED(X), (X =/= undefined)).
+
 -spec(start_link(esockd:transport(), esockd:socket(), proplists:proplist())
       -> {ok, pid()}).
 start_link(Transport, Socket, Options) ->
@@ -123,13 +129,8 @@ info(sockstate, #state{sockstate = SockSt}) ->
     SockSt;
 info(active_n, #state{active_n = ActiveN}) ->
     ActiveN;
-info(pub_limit, #state{pub_limit = PubLimit}) ->
-    limit_info(PubLimit);
-info(rate_limit, #state{rate_limit = RateLimit}) ->
-    limit_info(RateLimit).
-
-limit_info(Limit) ->
-    emqx_misc:maybe_apply(fun esockd_rate_limit:info/1, Limit).
+info(limiter, #state{limiter = Limiter}) ->
+    maybe_apply(fun emqx_limiter:info/1, Limiter).
 
 %% @doc Get stats of the connection/channel.
 -spec(stats(pid()|state()) -> emqx_types:stats()).
@@ -147,6 +148,13 @@ stats(#state{transport = Transport,
     ProcStats = emqx_misc:proc_stats(),
     lists:append([SockStats, ConnStats, ChanStats, ProcStats]).
 
+attrs(#state{active_n = ActiveN, sockstate = SockSt, channel = Channel}) ->
+    SockAttrs = #{active_n  => ActiveN,
+                  sockstate => SockSt
+                 },
+    ChanAttrs = emqx_channel:attrs(Channel),
+    maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
+
 call(Pid, Req) ->
     gen_server:call(Pid, Req, infinity).
 
@@ -169,7 +177,6 @@ init(Parent, Transport, RawSocket, Options) ->
 do_init(Parent, Transport, Socket, Options) ->
     {ok, Peername} = Transport:ensure_ok_or_exit(peername, [Socket]),
     {ok, Sockname} = Transport:ensure_ok_or_exit(sockname, [Socket]),
-    emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
     Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
     ConnInfo = #{socktype => Transport:type(Socket),
                  peername => Peername,
@@ -179,42 +186,39 @@ do_init(Parent, Transport, Socket, Options) ->
                 },
     Zone = proplists:get_value(zone, Options),
     ActiveN = proplists:get_value(active_n, Options, ?ACTIVE_N),
-    PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
-    RateLimit = init_limiter(proplists:get_value(rate_limit, Options)),
-    FrameOpts = emqx_zone:frame_options(Zone),
+    Limiter = emqx_limiter:init(Options),
+    FrameOpts = emqx_zone:mqtt_frame_options(Zone),
     ParseState = emqx_frame:initial_parse_state(FrameOpts),
     Serialize = emqx_frame:serialize_fun(),
     Channel = emqx_channel:init(ConnInfo, Options),
-    IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
-    IdleTimer = emqx_misc:start_timer(IdleTimout, idle_timeout),
-    HibAfterTimeout = emqx_zone:get_env(Zone, hibernate_after, IdleTimout*2),
-    State = #state{parent      = Parent,
-                   transport   = Transport,
+    GcState = emqx_zone:init_gc_state(Zone),
+    StatsTimer = emqx_zone:stats_timer(Zone),
+    IdleTimeout = emqx_zone:idle_timeout(Zone),
+    IdleTimer = start_timer(IdleTimeout, idle_timeout),
+    emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)),
+    emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
+    State = #state{transport   = Transport,
                    socket      = Socket,
                    peername    = Peername,
                    sockname    = Sockname,
                    sockstate   = idle,
                    active_n    = ActiveN,
-                   pub_limit   = PubLimit,
-                   rate_limit  = RateLimit,
+                   limiter     = Limiter,
                    parse_state = ParseState,
                    serialize   = Serialize,
                    channel     = Channel,
+                   gc_state    = GcState,
+                   stats_timer = StatsTimer,
                    idle_timer  = IdleTimer
                   },
     case activate_socket(State) of
         {ok, NState} ->
-            hibernate(NState, #{hibernate_after => HibAfterTimeout});
+            hibernate(Parent, NState, #{idle_timeout => IdleTimeout});
         {error, Reason} ->
             ok = Transport:fast_close(Socket),
             exit_on_sock_error(Reason)
     end.
 
--compile({inline, [init_limiter/1]}).
-init_limiter(undefined) -> undefined;
-init_limiter({Rate, Burst}) ->
-    esockd_rate_limit:new(Rate, Burst).
-
 exit_on_sock_error(Reason) when Reason =:= einval;
                                 Reason =:= enotconn;
                                 Reason =:= closed ->
@@ -227,8 +231,7 @@ exit_on_sock_error(Reason) ->
 %%--------------------------------------------------------------------
 %% Recv Loop
 
-recvloop(State = #state{parent = Parent},
-         Options = #{hibernate_after := HibAfterTimeout}) ->
+recvloop(Parent, State, Options = #{idle_timeout := IdleTimeout}) ->
     receive
         {system, From, Request} ->
             sys:handle_system_msg(Request, From, Parent,
@@ -236,33 +239,49 @@ recvloop(State = #state{parent = Parent},
         {'EXIT', Parent, Reason} ->
             terminate(Reason, State);
         Msg ->
-            process_msg([Msg], State, Options)
+            NState = ensure_stats_timer(IdleTimeout, State),
+            process_msg([Msg], Parent, NState, Options)
     after
-        HibAfterTimeout ->
-            hibernate(State, Options)
+        IdleTimeout ->
+            NState = cancel_stats_timer(State),
+            hibernate(Parent, NState, Options)
     end.
 
-hibernate(State, Options) ->
-    proc_lib:hibernate(?MODULE, wakeup_from_hib, [State, Options]).
+hibernate(Parent, State, Options) ->
+    proc_lib:hibernate(?MODULE, wakeup_from_hib, [Parent, State, Options]).
 
-wakeup_from_hib(State, Options) ->
+wakeup_from_hib(Parent, State, Options) ->
     %% Maybe do something later here.
-    recvloop(State, Options).
+    recvloop(Parent, State, Options).
+
+%%--------------------------------------------------------------------
+%% Ensure/cancel stats timer
+
+-compile({inline, [ensure_stats_timer/2]}).
+ensure_stats_timer(Timeout, State = #state{stats_timer = undefined}) ->
+    State#state{stats_timer = start_timer(Timeout, emit_stats)};
+ensure_stats_timer(_Timeout, State) -> State.
+
+-compile({inline, [cancel_stats_timer/1]}).
+cancel_stats_timer(State = #state{stats_timer = TRef}) when is_reference(TRef) ->
+    ok = emqx_misc:cancel_timer(TRef),
+    State#state{stats_timer = undefined};
+cancel_stats_timer(State) -> State.
 
 %%--------------------------------------------------------------------
 %% Process next Msg
 
-process_msg([], State, Options) ->
-    recvloop(State, Options);
+process_msg([], Parent, State, Options) ->
+    recvloop(Parent, State, Options);
 
-process_msg([Msg|More], State, Options) ->
+process_msg([Msg|More], Parent, State, Options) ->
     case catch handle_msg(Msg, State) of
         ok ->
-            process_msg(More, State, Options);
+            process_msg(More, Parent, State, Options);
         {ok, NState} ->
-            process_msg(More, NState, Options);
-        {ok, NextMsgs, NState} ->
-            process_msg(append_msg(NextMsgs, More), NState, Options);
+            process_msg(More, Parent, NState, Options);
+        {ok, Msgs, NState} ->
+            process_msg(append_msg(Msgs, More), Parent, NState, Options);
         {stop, Reason} ->
             terminate(Reason, State);
         {stop, Reason, NState} ->
@@ -284,14 +303,12 @@ handle_msg({'$gen_call', From, Req}, State) ->
             stop(Reason, NState)
     end;
 
-handle_msg({Inet, _Sock, Data}, State = #state{channel = Channel})
-  when Inet == tcp; Inet == ssl ->
+handle_msg({Inet, _Sock, Data}, State) when Inet == tcp; Inet == ssl ->
     ?LOG(debug, "RECV ~p", [Data]),
     Oct = iolist_size(Data),
-    emqx_pd:update_counter(incoming_bytes, Oct),
+    emqx_pd:inc_counter(incoming_bytes, Oct),
     ok = emqx_metrics:inc('bytes.received', Oct),
-    NChannel = emqx_channel:recvd(Oct, Channel),
-    parse_incoming(Data, State#state{channel = NChannel});
+    parse_incoming(Data, State);
 
 handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
            State = #state{idle_timer = IdleTimer}) ->
@@ -302,6 +319,9 @@ handle_msg({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
                         },
     handle_incoming(Packet, NState);
 
+handle_msg({incoming, ?PACKET(?PINGREQ)}, State) ->
+    handle_outgoing(?PACKET(?PINGRESP), State);
+
 handle_msg({incoming, Packet}, State) ->
     handle_incoming(Packet, State);
 
@@ -315,30 +335,34 @@ handle_msg({Closed, _Sock}, State)
 
 handle_msg({Passive, _Sock}, State)
   when Passive == tcp_passive; Passive == ssl_passive ->
-    %% Rate limit here:)
-    NState = ensure_rate_limit(State),
-    handle_info(activate_socket, NState);
-
-%% Rate limit timer expired.
-handle_msg(activate_socket, State) ->
-    NState = State#state{sockstate   = idle,
-                         limit_timer = undefined
-                        },
-    handle_info(activate_socket, NState);
+    InStats = #{cnt => emqx_pd:reset_counter(incoming_pubs),
+                oct => emqx_pd:reset_counter(incoming_bytes)
+               },
+    %% Ensure Rate Limit
+    NState = ensure_rate_limit(InStats, State),
+    %% Run GC and Check OOM
+    NState1 = check_oom(run_gc(InStats, NState)),
+    handle_info(activate_socket, NState1);
 
 handle_msg(Deliver = {deliver, _Topic, _Msg},
            State = #state{channel = Channel}) ->
-    Delivers = emqx_misc:drain_deliver([Deliver]),
-    Result = emqx_channel:handle_out(Delivers, Channel),
-    handle_return(Result, State);
+    Delivers = [Deliver|emqx_misc:drain_deliver()],
+    Ret = emqx_channel:handle_out(Delivers, Channel),
+    handle_chan_return(Ret, State);
 
 handle_msg({outgoing, Packets}, State) ->
-    NState = handle_outgoing(Packets, State),
-    {ok, NState};
-
-%% something sent
-handle_msg({inet_reply, _Sock, ok}, _State) ->
-    ok;
+    handle_outgoing(Packets, State);
+
+%% Something sent
+handle_msg({inet_reply, _Sock, ok}, State = #state{active_n = ActiveN}) ->
+    case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
+        true ->
+            OutStats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
+                         oct => emqx_pd:reset_counter(outgoing_bytes)
+                        },
+            {ok, check_oom(run_gc(OutStats, State))};
+        false -> ok
+    end;
 
 handle_msg({inet_reply, _Sock, {error, Reason}}, State) ->
     handle_info({sock_error, Reason}, State);
@@ -349,7 +373,8 @@ handle_msg({timeout, TRef, TMsg}, State) ->
 handle_msg(Shutdown = {shutdown, _Reason}, State) ->
     stop(Shutdown, State);
 
-handle_msg(Msg, State) -> handle_info(Msg, State).
+handle_msg(Msg, State) ->
+    handle_info(Msg, State).
 
 %%--------------------------------------------------------------------
 %% Terminate
@@ -363,8 +388,8 @@ terminate(Reason, State = #state{channel = Channel}) ->
 %%--------------------------------------------------------------------
 %% Sys callbacks
 
-system_continue(_Parent, _Deb, {State, Options}) ->
-    recvloop(State, Options).
+system_continue(Parent, _Deb, {State, Options}) ->
+    recvloop(Parent, State, Options).
 
 system_terminate(Reason, _Parent, _Deb, {State, _}) ->
     terminate(Reason, State).
@@ -392,8 +417,8 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
             shutdown(Reason, Reply, State#state{channel = NChannel});
         {shutdown, Reason, Reply, OutPacket, NChannel} ->
             NState = State#state{channel = NChannel},
-            NState1 = handle_outgoing(OutPacket, NState),
-            shutdown(Reason, Reply, NState1)
+            ok = handle_outgoing(OutPacket, NState),
+            shutdown(Reason, Reply, NState)
     end.
 
 %%--------------------------------------------------------------------
@@ -402,8 +427,18 @@ handle_call(_From, Req, State = #state{channel = Channel}) ->
 handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
     shutdown(idle_timeout, State);
 
-handle_timeout(TRef, emit_stats, State) ->
-    handle_timeout(TRef, {emit_stats, stats(State)}, State);
+handle_timeout(TRef, limit_timeout, State = #state{limit_timer = TRef}) ->
+    NState = State#state{sockstate   = idle,
+                         limit_timer = undefined
+                        },
+    handle_info(activate_socket, NState);
+
+handle_timeout(TRef, emit_stats, State = #state{stats_timer = TRef,
+                                                channel = Channel}) ->
+    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
+    (ClientId =/= undefined) andalso
+        emqx_cm:set_chan_stats(ClientId, stats(State)),
+    {ok, State#state{stats_timer = undefined}};
 
 handle_timeout(TRef, keepalive, State = #state{transport = Transport,
                                                socket    = Socket}) ->
@@ -415,7 +450,8 @@ handle_timeout(TRef, keepalive, State = #state{transport = Transport,
     end;
 
 handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
-    handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
+    Ret = emqx_channel:handle_timeout(TRef, Msg, Channel),
+    handle_chan_return(Ret, State).
 
 %%--------------------------------------------------------------------
 %% Parse incoming data
@@ -450,30 +486,30 @@ next_incoming_msgs(Packets) ->
 %%--------------------------------------------------------------------
 %% Handle incoming packet
 
-handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
-    _ = inc_incoming_stats(Type),
-    ok = emqx_metrics:inc_recv(Packet),
+handle_incoming(Packet, State = #state{channel = Channel})
+  when is_record(Packet, mqtt_packet) ->
+    ok = inc_incoming_stats(Packet),
     ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
-    handle_return(emqx_channel:handle_in(Packet, Channel), State);
+    handle_chan_return(emqx_channel:handle_in(Packet, Channel), State);
 
 handle_incoming(FrameError, State = #state{channel = Channel}) ->
-    handle_return(emqx_channel:handle_in(FrameError, Channel), State).
+    handle_chan_return(emqx_channel:handle_in(FrameError, Channel), State).
 
 %%--------------------------------------------------------------------
 %% Handle channel return
 
-handle_return(ok, State) ->
+handle_chan_return(ok, State) ->
     {ok, State};
-handle_return({ok, NChannel}, State) ->
+handle_chan_return({ok, NChannel}, State) ->
     {ok, State#state{channel = NChannel}};
-handle_return({ok, Replies, NChannel}, State) ->
+handle_chan_return({ok, Replies, NChannel}, State) ->
     {ok, next_msgs(Replies), State#state{channel = NChannel}};
-handle_return({shutdown, Reason, NChannel}, State) ->
+handle_chan_return({shutdown, Reason, NChannel}, State) ->
     shutdown(Reason, State#state{channel = NChannel});
-handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
+handle_chan_return({shutdown, Reason, OutPacket, NChannel}, State) ->
     NState = State#state{channel = NChannel},
-    NState1 = handle_outgoing(OutPacket, NState),
-    shutdown(Reason, NState1).
+    ok = handle_outgoing(OutPacket, NState),
+    shutdown(Reason, NState).
 
 %%--------------------------------------------------------------------
 %% Handle outgoing packets
@@ -485,14 +521,13 @@ handle_outgoing(Packet, State) ->
     send((serialize_and_inc_stats_fun(State))(Packet), State).
 
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
-    fun(Packet = ?PACKET(Type)) ->
+    fun(Packet) ->
         case Serialize(Packet) of
             <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
                          [emqx_packet:format(Packet)]),
                     <<>>;
-            Data -> _ = inc_outgoing_stats(Type),
-                    _ = emqx_metrics:inc_sent(Packet),
-                    ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
+            Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
+                    ok = inc_outgoing_stats(Packet),
                     Data
         end
     end.
@@ -500,52 +535,52 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
 %%--------------------------------------------------------------------
 %% Send data
 
-send(IoData, State = #state{transport = Transport,
-                            socket    = Socket,
-                            channel   = Channel}) ->
+-spec(send(iodata(), state()) -> ok).
+send(IoData, #state{transport = Transport, socket = Socket}) ->
     Oct = iolist_size(IoData),
     ok = emqx_metrics:inc('bytes.sent', Oct),
+    emqx_pd:inc_counter(outgoing_bytes, Oct),
     case Transport:async_send(Socket, IoData) of
-        ok ->
-            State#state{channel = emqx_channel:sent(Oct, Channel)};
+        ok -> ok;
         Error = {error, _Reason} ->
-            %% Simulate an inet_reply to postpone handling the error
-            self() ! {inet_reply, Socket, Error}, State
+            %% Send an inet_reply to postpone handling the error
+            self() ! {inet_reply, Socket, Error},
+            ok
     end.
 
 %%--------------------------------------------------------------------
 %% Handle Info
 
-handle_info({connack, ConnAck}, State = #state{active_n  = ActiveN,
-                                                sockstate = SockSt,
-                                                channel   = Channel}) ->
-    NState = handle_outgoing(ConnAck, State),
-    ChanAttrs = emqx_channel:attrs(Channel),
-    SockAttrs = #{active_n  => ActiveN,
-                  sockstate => SockSt
-                 },
-    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
-    handle_info({register, Attrs, stats(State)}, NState);
+handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
+    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
+    ok = emqx_cm:register_channel(ClientId),
+    ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)),
+    ok = emqx_cm:set_chan_stats(ClientId, stats(State)),
+    ok = handle_outgoing(ConnAck, State);
 
-handle_info({enter, disconnected}, State = #state{active_n  = ActiveN,
-                                                sockstate = SockSt,
-                                                channel   = Channel}) ->
-    ChanAttrs = emqx_channel:attrs(Channel),
-    SockAttrs = #{active_n  => ActiveN,
-                  sockstate => SockSt
-                 },
-    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
-    handle_info({register, Attrs, stats(State)}, State);
+handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
+    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
+    emqx_cm:set_chan_attrs(ClientId, attrs(State)),
+    emqx_cm:set_chan_stats(ClientId, stats(State));
 
-handle_info(activate_socket, State) ->
+handle_info(activate_socket, State = #state{sockstate = OldSst}) ->
     case activate_socket(State) of
-        {ok, NState} -> {ok, NState};
+        {ok, NState = #state{sockstate = NewSst}} ->
+            if OldSst =/= NewSst ->
+                   {ok, {event, sockstate_changed}, NState};
+               true -> {ok, NState}
+            end;
         {error, Reason} ->
             handle_info({sock_error, Reason}, State)
     end;
 
+handle_info({event, sockstate_changed}, State = #state{channel = Channel}) ->
+    #{clientid := ClientId} = emqx_channel:info(clientinfo, Channel),
+    ClientId =/= undefined andalso emqx_cm:set_chan_attrs(ClientId, attrs(State));
+
 %%TODO: this is not right
-handle_info({sock_error, _Reason}, #state{sockstate = closed}) -> ok;
+handle_info({sock_error, _Reason}, #state{sockstate = closed}) ->
+    ok;
 handle_info({sock_error, Reason}, State) ->
     ?LOG(debug, "Socket error: ~p", [Reason]),
     handle_info({sock_closed, Reason}, close_socket(State));
@@ -560,7 +595,45 @@ handle_info({close, Reason}, State) ->
     handle_info({sock_closed, Reason}, close_socket(State));
 
 handle_info(Info, State = #state{channel = Channel}) ->
-    handle_return(emqx_channel:handle_info(Info, Channel), State).
+    handle_chan_return(emqx_channel:handle_info(Info, Channel), State).
+
+%%--------------------------------------------------------------------
+%% Ensure rate limit
+
+ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
+    case ?ENABLED(limiter) andalso emqx_limiter:check(Stats, Limiter) of
+        false -> State;
+        {ok, Limiter1} ->
+            State#state{limiter = Limiter1};
+        {pause, Time, Limiter1} ->
+            ?LOG(debug, "Pause ~pms due to rate limit", [Time]),
+            TRef = start_timer(Time, limit_timeout),
+            State#state{sockstate   = blocked,
+                        limiter     = Limiter1,
+                        limit_timer = TRef
+                       }
+    end.
+
+%%--------------------------------------------------------------------
+%% Run GC and Check OOM
+
+run_gc(Stats, State = #state{gc_state = GcSt}) ->
+    case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
+        false -> State;
+        {IsGC, GcSt1} ->
+            IsGC andalso emqx_metrics:inc('channel.gc.cnt'),
+            State#state{gc_state = GcSt1}
+    end.
+
+check_oom(State = #state{channel = Channel}) ->
+    #{zone := Zone} = emqx_channel:info(clientinfo, Channel),
+    OomPolicy = emqx_zone:oom_policy(Zone),
+    case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
+        Shutdown = {shutdown, _Reason} ->
+            erlang:send(self(), Shutdown);
+        _Other -> ok
+    end,
+    State.
 
 %%--------------------------------------------------------------------
 %% Activate Socket
@@ -587,48 +660,30 @@ close_socket(State = #state{transport = Transport, socket = Socket}) ->
     ok = Transport:fast_close(Socket),
     State#state{sockstate = closed}.
 
-%%--------------------------------------------------------------------
-%% Ensure rate limit
-
--define(ENABLED(Rl), (Rl =/= undefined)).
-
-ensure_rate_limit(State = #state{rate_limit = Rl, pub_limit = Pl}) ->
-    Pubs = emqx_pd:reset_counter(incoming_pubs),
-    Bytes = emqx_pd:reset_counter(incoming_bytes),
-    Limiters = [{Pl, #state.pub_limit, Pubs} || ?ENABLED(Pl)] ++
-               [{Rl, #state.rate_limit, Bytes} || ?ENABLED(Rl)],
-    ensure_rate_limit(Limiters, State).
-
-ensure_rate_limit([], State) ->
-    State;
-ensure_rate_limit([{Rl, Pos, Cnt}|Limiters], State) ->
-    case esockd_rate_limit:check(Cnt, Rl) of
-        {0, Rl1} ->
-            ensure_rate_limit(Limiters, setelement(Pos, State, Rl1));
-        {Pause, Rl1} ->
-            ?LOG(debug, "Pause ~pms due to rate limit", [Pause]),
-            TRef = erlang:send_after(Pause, self(), activate_socket),
-            NState = State#state{sockstate = blocked, limit_timer = TRef},
-            setelement(Pos, NState, Rl1)
-    end.
-
 %%--------------------------------------------------------------------
 %% Inc incoming/outgoing stats
 
 -compile({inline, [inc_incoming_stats/1]}).
-inc_incoming_stats(Type) when is_integer(Type) ->
-    emqx_pd:update_counter(recv_pkt, 1),
+inc_incoming_stats(Packet = ?PACKET(Type)) ->
+    emqx_pd:inc_counter(recv_pkt, 1),
     if
         Type == ?PUBLISH ->
-            emqx_pd:update_counter(recv_msg, 1),
-            emqx_pd:update_counter(incoming_pubs, 1);
+            emqx_pd:inc_counter(recv_msg, 1),
+            emqx_pd:inc_counter(incoming_pubs, 1);
         true -> ok
-    end.
+    end,
+    emqx_metrics:inc_recv(Packet).
 
 -compile({inline, [inc_outgoing_stats/1]}).
-inc_outgoing_stats(Type) ->
-    emqx_pd:update_counter(send_pkt, 1),
-    (Type == ?PUBLISH) andalso emqx_pd:update_counter(send_msg, 1).
+inc_outgoing_stats(Packet = ?PACKET(Type)) ->
+    emqx_pd:inc_counter(send_pkt, 1),
+    if
+        Type == ?PUBLISH ->
+            emqx_pd:inc_counter(send_msg, 1),
+            emqx_pd:inc_counter(outgoing_pubs, 1);
+        true -> ok
+    end,
+    emqx_metrics:inc_sent(Packet).
 
 %%--------------------------------------------------------------------
 %% Helper functions
@@ -646,13 +701,14 @@ next_msgs(Action) when is_tuple(Action) ->
 next_msgs(Actions) when is_list(Actions) ->
     Actions.
 
+-compile({inline, [shutdown/2, shutdown/3]}).
 shutdown(Reason, State) ->
     stop({shutdown, Reason}, State).
 
 shutdown(Reason, Reply, State) ->
     stop({shutdown, Reason}, Reply, State).
 
--compile({inline, [stop/2]}).
+-compile({inline, [stop/2, stop/3]}).
 stop(Reason, State) ->
     {stop, Reason, State}.
 

+ 11 - 5
src/emqx_gc.erl

@@ -29,6 +29,7 @@
 -include("types.hrl").
 
 -export([ init/1
+        , run/2
         , run/3
         , info/1
         , reset/1
@@ -57,21 +58,26 @@ init(#{count := Count, bytes := Bytes}) ->
     ?GCS(maps:from_list(Cnt ++ Oct)).
 
 %% @doc Try to run GC based on reduntions of count or bytes.
+-spec(run(#{cnt := pos_integer(), oct := pos_integer()}, gc_state())
+      -> {boolean(), gc_state()}).
+run(#{cnt := Cnt, oct := Oct}, GcSt) ->
+    run(Cnt, Oct, GcSt).
+
 -spec(run(pos_integer(), pos_integer(), gc_state())
       -> {boolean(), gc_state()}).
 run(Cnt, Oct, ?GCS(St)) ->
-    {Res, St1} = run([{cnt, Cnt}, {oct, Oct}], St),
+    {Res, St1} = do_run([{cnt, Cnt}, {oct, Oct}], St),
     {Res, ?GCS(St1)}.
 
-run([], St) ->
+do_run([], St) ->
     {false, St};
-run([{K, N}|T], St) ->
+do_run([{K, N}|T], St) ->
     case dec(K, N, St) of
         {true, St1} ->
-            true = erlang:garbage_collect(),
+            erlang:garbage_collect(),
             {true, do_reset(St1)};
         {false, St1} ->
-            run(T, St1)
+            do_run(T, St1)
     end.
 
 %% @doc Info of GC state.

+ 73 - 0
src/emqx_limiter.erl

@@ -0,0 +1,73 @@
+%%--------------------------------------------------------------------
+%% Copyright (c) 2019 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%--------------------------------------------------------------------
+
+-module(emqx_limiter).
+
+-include("types.hrl").
+
+-export([init/1, info/1, check/2]).
+
+-import(emqx_misc, [maybe_apply/2]).
+
+-record(limiter, {
+          %% Publish Limit
+          pub_limit :: maybe(esockd_rate_limit:bucket()),
+          %% Rate Limit
+          rate_limit :: maybe(esockd_rate_limit:bucket())
+         }).
+
+-type(limiter() :: #limiter{}).
+
+-export_type([limiter/0]).
+
+-define(ENABLED(Rl), (Rl =/= undefined)).
+
+-spec(init(proplists:proplist()) -> maybe(limiter())).
+init(Options) ->
+    Zone = proplists:get_value(zone, Options),
+    Pl = emqx_zone:publish_limit(Zone),
+    Rl = proplists:get_value(rate_limit, Options),
+    case ?ENABLED(Pl) or ?ENABLED(Rl) of
+        true  -> #limiter{pub_limit  = init_limit(Pl),
+                          rate_limit = init_limit(Rl)
+                         };
+        false -> undefined
+    end.
+
+init_limit(Rl) ->
+    maybe_apply(fun esockd_rate_limit:new/1, Rl).
+
+info(#limiter{pub_limit = Pl, rate_limit = Rl}) ->
+    #{pub_limit => info(Pl), rate_limit => info(Rl)};
+
+info(Rl) ->
+    maybe_apply(fun esockd_rate_limit:info/1, Rl).
+
+check(#{cnt := Cnt, oct := Oct}, Limiter = #limiter{pub_limit  = Pl,
+                                                    rate_limit = Rl}) ->
+    do_check([{#limiter.pub_limit, Cnt, Pl} || ?ENABLED(Pl)] ++
+             [{#limiter.rate_limit, Oct, Rl} || ?ENABLED(Rl)], Limiter).
+
+do_check([], Limiter) ->
+    {ok, Limiter};
+do_check([{Pos, Cnt, Rl}|More], Limiter) ->
+    case esockd_rate_limit:check(Cnt, Rl) of
+        {0, Rl1} ->
+            do_check(More, setelement(Pos, Limiter, Rl1));
+        {Pause, Rl1} ->
+            {pause, Pause, setelement(Pos, Limiter, Rl1)}
+    end.
+

+ 79 - 33
src/emqx_misc.erl

@@ -16,6 +16,8 @@
 
 -module(emqx_misc).
 
+-compile(inline).
+
 -include("types.hrl").
 -include("logger.hrl").
 
@@ -26,22 +28,20 @@
         , start_timer/2
         , start_timer/3
         , cancel_timer/1
+        , drain_deliver/0
+        , drain_down/1
+        , check_oom/1
+        , check_oom/2
+        , tune_heap_size/1
         , proc_name/2
         , proc_stats/0
         , proc_stats/1
+        , rand_seed/0
+        , now_to_secs/1
+        , now_to_ms/1
         , index_of/2
         ]).
 
--export([ drain_deliver/0
-        , drain_deliver/1
-        , drain_down/1
-        ]).
-
--compile({inline,
-          [ start_timer/2
-          , start_timer/3
-          ]}).
-
 %% @doc Merge options
 -spec(merge_opts(Opts, Opts) -> Opts when Opts :: proplists:proplist()).
 merge_opts(Defaults, Options) ->
@@ -112,27 +112,7 @@ cancel_timer(Timer) when is_reference(Timer) ->
     end;
 cancel_timer(_) -> ok.
 
--spec(proc_name(atom(), pos_integer()) -> atom()).
-proc_name(Mod, Id) ->
-    list_to_atom(lists:concat([Mod, "_", Id])).
-
-%% Get Proc's Stats.
--spec(proc_stats() -> emqx_types:stats()).
-proc_stats() -> proc_stats(self()).
-
--spec(proc_stats(pid()) -> emqx_types:stats()).
-proc_stats(Pid) ->
-    case process_info(Pid, [message_queue_len,
-                            heap_size,
-                            total_heap_size,
-                            reductions,
-                            memory]) of
-        undefined -> [];
-        [{message_queue_len, Len}|ProcStats] ->
-            [{mailbox_len, Len}|ProcStats]
-    end.
-
-%% @doc Drain delivers from the channel's mailbox.
+%% @doc Drain delivers from the channel proc's mailbox.
 drain_deliver() ->
     drain_deliver([]).
 
@@ -144,7 +124,7 @@ drain_deliver(Acc) ->
         lists:reverse(Acc)
     end.
 
-%% @doc Drain process down events.
+%% @doc Drain process 'DOWN' events.
 -spec(drain_down(pos_integer()) -> list(pid())).
 drain_down(Cnt) when Cnt > 0 ->
     drain_down(Cnt, []).
@@ -156,9 +136,75 @@ drain_down(Cnt, Acc) ->
         {'DOWN', _MRef, process, Pid, _Reason} ->
             drain_down(Cnt - 1, [Pid|Acc])
     after 0 ->
-        drain_down(0, Acc)
+        lists:reverse(Acc)
     end.
 
+%% @doc Check process's mailbox and heapsize against OOM policy,
+%% return `ok | {shutdown, Reason}' accordingly.
+%% `ok': There is nothing out of the ordinary.
+%% `shutdown': Some numbers (message queue length hit the limit),
+%%             hence shutdown for greater good (system stability).
+-spec(check_oom(emqx_types:oom_policy()) -> ok | {shutdown, term()}).
+check_oom(Policy) ->
+    check_oom(self(), Policy).
+
+-spec(check_oom(pid(), emqx_types:oom_policy()) -> ok | {shutdown, term()}).
+check_oom(Pid, #{message_queue_len := MaxQLen,
+                 max_heap_size := MaxHeapSize}) ->
+    case process_info(Pid, [message_queue_len, total_heap_size]) of
+        undefined -> ok;
+        [{message_queue_len, QLen}, {total_heap_size, HeapSize}] ->
+            do_check_oom([{QLen, MaxQLen, message_queue_too_long},
+                          {HeapSize, MaxHeapSize, proc_heap_too_large}
+                         ])
+    end.
+
+do_check_oom([]) -> ok;
+do_check_oom([{Val, Max, Reason}|Rest]) ->
+    case is_integer(Max) andalso (0 < Max) andalso (Max < Val) of
+        true  -> {shutdown, Reason};
+        false -> do_check_oom(Rest)
+    end.
+
+tune_heap_size(#{max_heap_size := MaxHeapSize}) ->
+    %% If set to zero, the limit is disabled.
+    erlang:process_flag(max_heap_size, #{size => MaxHeapSize,
+                                         kill => false,
+                                         error_logger => true
+                                        });
+tune_heap_size(undefined) -> ok.
+
+-spec(proc_name(atom(), pos_integer()) -> atom()).
+proc_name(Mod, Id) ->
+    list_to_atom(lists:concat([Mod, "_", Id])).
+
+%% Get Proc's Stats.
+-spec(proc_stats() -> emqx_types:stats()).
+proc_stats() -> proc_stats(self()).
+
+-spec(proc_stats(pid()) -> emqx_types:stats()).
+proc_stats(Pid) ->
+    case process_info(Pid, [message_queue_len,
+                            heap_size,
+                            total_heap_size,
+                            reductions,
+                            memory]) of
+        undefined -> [];
+        [{message_queue_len, Len}|ProcStats] ->
+            [{mailbox_len, Len}|ProcStats]
+    end.
+
+rand_seed() ->
+    rand:seed(exsplus, erlang:timestamp()).
+
+-spec(now_to_secs(erlang:timestamp()) -> pos_integer()).
+now_to_secs({MegaSecs, Secs, _MicroSecs}) ->
+    MegaSecs * 1000000 + Secs.
+
+-spec(now_to_ms(erlang:timestamp()) -> pos_integer()).
+now_to_ms({MegaSecs, Secs, MicroSecs}) ->
+    (MegaSecs * 1000000 + Secs) * 1000 + round(MicroSecs/1000).
+
 %% lists:index_of/2
 index_of(E, L) ->
     index_of(E, 1, L).

+ 4 - 4
src/emqx_pd.erl

@@ -21,14 +21,14 @@
 
 -export([ get_counters/1
         , get_counter/1
-        , update_counter/2
+        , inc_counter/2
         , reset_counter/1
         ]).
 
 -compile({inline,
           [ get_counters/1
           , get_counter/1
-          , update_counter/2
+          , inc_counter/2
           , reset_counter/1
           ]}).
 
@@ -42,8 +42,8 @@ get_counters(Keys) when is_list(Keys) ->
 get_counter(Key) ->
     case get(Key) of undefined -> 0; Cnt -> Cnt end.
 
--spec(update_counter(key(), number()) -> maybe(number())).
-update_counter(Key, Inc) ->
+-spec(inc_counter(key(), number()) -> maybe(number())).
+inc_counter(Key, Inc) ->
     put(Key, get_counter(Key) + Inc).
 
 -spec(reset_counter(key()) -> number()).

+ 6 - 0
src/emqx_types.erl

@@ -85,6 +85,8 @@
              , stats/0
              ]).
 
+-export_type([oom_policy/0]).
+
 -type(ver() :: ?MQTT_PROTO_V3
              | ?MQTT_PROTO_V4
              | ?MQTT_PROTO_V5).
@@ -186,3 +188,7 @@
 -type(infos() :: #{atom() => term()}).
 -type(stats() :: #{atom() => non_neg_integer()|stats()}).
 
+-type(oom_policy() :: #{message_queue_len => non_neg_integer(),
+                        max_heap_size => non_neg_integer()
+                       }).
+

+ 214 - 74
src/emqx_ws_connection.erl

@@ -45,6 +45,11 @@
         , terminate/3
         ]).
 
+-import(emqx_misc,
+        [ maybe_apply/2
+        , start_timer/2
+        ]).
+
 -record(state, {
           %% Peername of the ws connection.
           peername :: emqx_types:peername(),
@@ -52,24 +57,41 @@
           sockname :: emqx_types:peername(),
           %% Sock state
           sockstate :: emqx_types:sockstate(),
-          %% Parser State
+          %% Simulate the active_n opt
+          active_n :: pos_integer(),
+          %% Limiter
+          limiter :: emqx_limiter:limiter(),
+          %% Limit Timer
+          limit_timer :: maybe(reference()),
+          %% Parse State
           parse_state :: emqx_frame:parse_state(),
           %% Serialize function
           serialize :: emqx_frame:serialize_fun(),
           %% Channel
           channel :: emqx_channel:channel(),
+          %% GC State
+          gc_state :: maybe(emqx_gc:gc_state()),
           %% Out Pending Packets
           pendings :: list(emqx_types:packet()),
+          %% Stats Timer
+          stats_timer :: disabled | maybe(reference()),
+          %% Idle Timeout
+          idle_timeout :: timeout(),
+          %% Idle Timer
+          idle_timer :: reference(),
           %% The stop reason
           stop_reason :: term()
         }).
 
 -type(state() :: #state{}).
 
--define(INFO_KEYS, [socktype, peername, sockname, sockstate]).
+-define(ACTIVE_N, 100).
+-define(INFO_KEYS, [socktype, peername, sockname, sockstate, active_n, limiter]).
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt]).
 -define(CONN_STATS, [recv_pkt, recv_msg, send_pkt, send_msg]).
 
+-define(ENABLED(X), (X =/= undefined)).
+
 %%--------------------------------------------------------------------
 %% API
 %%--------------------------------------------------------------------
@@ -92,11 +114,20 @@ info(sockname, #state{sockname = Sockname}) ->
     Sockname;
 info(sockstate, #state{sockstate = SockSt}) ->
     SockSt;
+info(active_n, #state{active_n = ActiveN}) ->
+    ActiveN;
+info(limiter, #state{limiter = Limiter}) ->
+    maybe_apply(fun emqx_limiter:info/1, Limiter);
 info(channel, #state{channel = Channel}) ->
     emqx_channel:info(Channel);
 info(stop_reason, #state{stop_reason = Reason}) ->
     Reason.
 
+attrs(State = #state{channel = Channel}) ->
+    ChanAttrs = emqx_channel:attrs(Channel),
+    SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
+    maps:merge(ChanAttrs, #{sockinfo => SockAttrs}).
+
 -spec(stats(pid()|state()) -> emqx_types:stats()).
 stats(WsPid) when is_pid(WsPid) ->
     call(WsPid, stats);
@@ -128,6 +159,7 @@ call(WsPid, Req) when is_pid(WsPid) ->
 %%--------------------------------------------------------------------
 
 init(Req, Opts) ->
+    %% WS Transport Idle Timeout
     IdleTimeout = proplists:get_value(idle_timeout, Opts, 7200000),
     DeflateOptions = maps:from_list(proplists:get_value(deflate_options, Opts, [])),
     MaxFrameSize = case proplists:get_value(max_frame_size, Opts, 0) of
@@ -174,29 +206,41 @@ websocket_init([Req, Opts]) ->
                  conn_mod  => ?MODULE
                 },
     Zone = proplists:get_value(zone, Opts),
-    FrameOpts = emqx_zone:frame_options(Zone),
+    ActiveN = proplists:get_value(active_n, Opts, ?ACTIVE_N),
+    Limiter = emqx_limiter:init(Opts),
+    FrameOpts = emqx_zone:mqtt_frame_options(Zone),
     ParseState = emqx_frame:initial_parse_state(FrameOpts),
     Serialize = emqx_frame:serialize_fun(),
     Channel = emqx_channel:init(ConnInfo, Opts),
+    GcState = emqx_zone:init_gc_state(Zone),
+    StatsTimer = emqx_zone:stats_timer(Zone),
+    %% MQTT Idle Timeout
+    IdleTimeout = emqx_zone:idle_timeout(Zone),
+    IdleTimer = start_timer(IdleTimeout, idle_timeout),
+    emqx_misc:tune_heap_size(emqx_zone:oom_policy(Zone)),
     emqx_logger:set_metadata_peername(esockd_net:format(Peername)),
-    {ok, #state{peername    = Peername,
-                sockname    = Sockname,
-                sockstate   = idle,
-                parse_state = ParseState,
-                serialize   = Serialize,
-                channel     = Channel,
-                pendings    = []
-               }}.
+    {ok, #state{peername     = Peername,
+                sockname     = Sockname,
+                sockstate    = running,
+                active_n     = ActiveN,
+                limiter      = Limiter,
+                parse_state  = ParseState,
+                serialize    = Serialize,
+                channel      = Channel,
+                gc_state     = GcState,
+                pendings     = [],
+                stats_timer  = StatsTimer,
+                idle_timeout = IdleTimeout,
+                idle_timer   = IdleTimer
+               }, hibernate}.
 
 websocket_handle({binary, Data}, State) when is_list(Data) ->
     websocket_handle({binary, iolist_to_binary(Data)}, State);
 
-websocket_handle({binary, Data}, State = #state{channel = Channel}) ->
+websocket_handle({binary, Data}, State) ->
     ?LOG(debug, "RECV ~p", [Data]),
-    Oct = iolist_size(Data),
-    ok = inc_recv_stats(1, Oct),
-    NChannel = emqx_channel:recvd(Oct, Channel),
-    parse_incoming(Data, State#state{channel = NChannel});
+    ok = inc_recv_stats(1, iolist_size(Data)),
+    parse_incoming(Data, ensure_stats_timer(State));
 
 %% Pings should be replied with pongs, cowboy does it automatically
 %% Pongs can be safely ignored. Clause here simply prevents crash.
@@ -215,30 +259,43 @@ websocket_info({call, From, Req}, State) ->
     handle_call(From, Req, State);
 
 websocket_info({cast, Msg}, State = #state{channel = Channel}) ->
-    handle_return(emqx_channel:handle_info(Msg, Channel), State);
+    handle_chan_return(emqx_channel:handle_info(Msg, Channel), State);
 
-websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)}, State) ->
+websocket_info({incoming, Packet = ?CONNECT_PACKET(ConnPkt)},
+               State = #state{idle_timer = IdleTimer}) ->
+    ok = emqx_misc:cancel_timer(IdleTimer),
     Serialize = emqx_frame:serialize_fun(ConnPkt),
-    NState = State#state{sockstate = running,
-                         serialize = Serialize
+    NState = State#state{serialize  = Serialize,
+                         idle_timer = undefined
                         },
     handle_incoming(Packet, NState);
 
+websocket_info({incoming, ?PACKET(?PINGREQ)}, State) ->
+    reply(?PACKET(?PINGRESP), State);
+
 websocket_info({incoming, Packet}, State) ->
     handle_incoming(Packet, State);
 
-websocket_info(Deliver = {deliver, _Topic, _Msg},
-               State = #state{channel = Channel}) ->
-    Delivers = emqx_misc:drain_deliver([Deliver]),
-    Result = emqx_channel:handle_out(Delivers, Channel),
-    handle_return(Result, State);
+websocket_info(rate_limit, State) ->
+    InStats = #{cnt => emqx_pd:reset_counter(incoming_pubs),
+                oct => emqx_pd:reset_counter(incoming_bytes)
+               },
+    erlang:send(self(), {check_gc, InStats}),
+    ensure_rate_limit(InStats, State);
 
-websocket_info({timeout, TRef, keepalive}, State) when is_reference(TRef) ->
-    RecvOct = emqx_pd:get_counter(recv_oct),
-    handle_timeout(TRef, {keepalive, RecvOct}, State);
+websocket_info({check_gc, Stats}, State) ->
+    {ok, check_oom(run_gc(Stats, State))};
 
-websocket_info({timeout, TRef, emit_stats}, State) when is_reference(TRef) ->
-    handle_timeout(TRef, {emit_stats, stats(State)}, State);
+websocket_info({deliver, _Topic, _Msg} = Deliver, State = #state{channel = Channel}) ->
+    Delivers = [Deliver|emqx_misc:drain_deliver()],
+    Ret = emqx_channel:handle_out(Delivers, Channel),
+    handle_chan_return(Ret, State);
+
+websocket_info({timeout, TRef, limit_timeout}, State = #state{limit_timer = TRef}) ->
+    NState = State#state{sockstate   = running,
+                         limit_timer = undefined
+                        },
+    {reply, [{active, true}], NState};
 
 websocket_info({timeout, TRef, Msg}, State) when is_reference(TRef) ->
     handle_timeout(TRef, Msg, State);
@@ -293,27 +350,89 @@ handle_call(From, Req, State = #state{channel = Channel}) ->
 %% Handle Info
 
 handle_info({connack, ConnAck}, State = #state{channel = Channel}) ->
-    ChanAttrs = emqx_channel:attrs(Channel),
-    SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
-    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
-    ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel),
+    ClientId = emqx_channel:info(clientid, Channel),
+    ok = emqx_cm:register_channel(ClientId),
+    ok = emqx_cm:set_chan_attrs(ClientId, attrs(State)),
+    ok = emqx_cm:set_chan_stats(ClientId, stats(State)),
     reply(enqueue(ConnAck, State));
 
 handle_info({enter, disconnected}, State = #state{channel = Channel}) ->
-    ChanAttrs = emqx_channel:attrs(Channel),
-    SockAttrs = maps:from_list(info(?INFO_KEYS, State)),
-    Attrs = maps:merge(ChanAttrs, #{sockinfo => SockAttrs}),
-    ok = emqx_channel:handle_info({register, Attrs, stats(State)}, Channel),
+    ClientId = emqx_channel:info(clientid, Channel),
+    emqx_cm:set_chan_attrs(ClientId, attrs(State)),
+    emqx_cm:set_chan_stats(ClientId, stats(State)),
     reply(State);
 
 handle_info(Info, State = #state{channel = Channel}) ->
-    handle_return(emqx_channel:handle_info(Info, Channel), State).
+    Ret = emqx_channel:handle_info(Info, Channel),
+    handle_chan_return(Ret, State).
 
 %%--------------------------------------------------------------------
 %% Handle timeout
 
-handle_timeout(TRef, Msg, State = #state{channel = Channel}) ->
-    handle_return(emqx_channel:handle_timeout(TRef, Msg, Channel), State).
+handle_timeout(TRef, idle_timeout, State = #state{idle_timer = TRef}) ->
+    shutdown(idle_timeout, State);
+
+handle_timeout(TRef, keepalive, State) when is_reference(TRef) ->
+    RecvOct = emqx_pd:get_counter(recv_oct),
+    handle_timeout(TRef, {keepalive, RecvOct}, State);
+
+handle_timeout(TRef, emit_stats, State = #state{channel = Channel,
+                                                stats_timer = TRef}) ->
+    ClientId = emqx_channel:info(clientid, Channel),
+    (ClientId =/= undefined) andalso emqx_cm:set_chan_stats(ClientId, stats(State)),
+    reply(State#state{stats_timer = undefined});
+
+handle_timeout(TRef, TMsg, State = #state{channel = Channel}) ->
+    Ret = emqx_channel:handle_timeout(TRef, TMsg, Channel),
+    handle_chan_return(Ret, State).
+
+%%--------------------------------------------------------------------
+%% Ensure stats timer
+
+-compile({inline, [ensure_stats_timer/1]}).
+ensure_stats_timer(State = #state{idle_timeout = Timeout,
+                                  stats_timer  = undefined}) ->
+    State#state{stats_timer = start_timer(Timeout, emit_stats)};
+ensure_stats_timer(State) -> State.
+
+%%--------------------------------------------------------------------
+%% Ensure rate limit
+
+ensure_rate_limit(Stats, State = #state{limiter = Limiter}) ->
+    case ?ENABLED(Limiter) andalso emqx_limiter:check(Stats, Limiter) of
+        false -> {ok, State};
+        {ok, Limiter1} ->
+            {ok, State#state{limiter = Limiter1}};
+        {pause, Time, Limiter1} ->
+            ?LOG(debug, "Pause ~pms due to rate limit", [Time]),
+            TRef = start_timer(Time, limit_timeout),
+            NState = State#state{sockstate   = blocked,
+                                 limiter     = Limiter1,
+                                 limit_timer = TRef
+                                },
+            {reply, [{active, false}], NState}
+    end.
+
+%%--------------------------------------------------------------------
+%% Run GC and Check OOM
+
+run_gc(Stats, State = #state{gc_state = GcSt}) ->
+    case ?ENABLED(GcSt) andalso emqx_gc:run(Stats, GcSt) of
+        false -> State;
+        {IsGC, GcSt1} ->
+            IsGC andalso emqx_metrics:inc('channel.gc.cnt'),
+            State#state{gc_state = GcSt1}
+    end.
+
+check_oom(State = #state{channel = Channel}) ->
+    #{zone := Zone} = emqx_channel:info(clientinfo, Channel),
+    OomPolicy = emqx_zone:oom_policy(Zone),
+    case ?ENABLED(OomPolicy) andalso emqx_misc:check_oom(OomPolicy) of
+        Shutdown = {shutdown, _Reason} ->
+            erlang:send(self(), Shutdown);
+        _Other -> ok
+    end,
+    State.
 
 %%--------------------------------------------------------------------
 %% Parse incoming data
@@ -326,7 +445,7 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
         {more, NParseState} ->
             {ok, State#state{parse_state = NParseState}};
         {ok, Packet, Rest, NParseState} ->
-            self() ! {incoming, Packet},
+            erlang:send(self(), {incoming, Packet}),
             parse_incoming(Rest, State#state{parse_state = NParseState})
     catch
         error:Reason:Stk ->
@@ -337,52 +456,60 @@ parse_incoming(Data, State = #state{parse_state = ParseState}) ->
     end.
 
 %%--------------------------------------------------------------------
-%% Handle incoming packets
+%% Handle incoming packet
 
-handle_incoming(Packet = ?PACKET(Type), State = #state{channel = Channel}) ->
-    _ = inc_incoming_stats(Type),
-    _ = emqx_metrics:inc_recv(Packet),
+handle_incoming(Packet, State = #state{active_n = ActiveN, channel = Channel})
+  when is_record(Packet, mqtt_packet) ->
     ?LOG(debug, "RECV ~s", [emqx_packet:format(Packet)]),
-    handle_return(emqx_channel:handle_in(Packet, Channel), State);
+    ok = inc_incoming_stats(Packet),
+    (emqx_pd:get_counter(incoming_pubs) > ActiveN)
+        andalso erlang:send(self(), rate_limit),
+    Ret = emqx_channel:handle_in(Packet, Channel),
+    handle_chan_return(Ret, State);
 
 handle_incoming(FrameError, State = #state{channel = Channel}) ->
-    handle_return(emqx_channel:handle_in(FrameError, Channel), State).
+    handle_chan_return(emqx_channel:handle_in(FrameError, Channel), State).
 
 %%--------------------------------------------------------------------
 %% Handle channel return
 
-handle_return(ok, State) ->
+handle_chan_return(ok, State) ->
     reply(State);
-handle_return({ok, NChannel}, State) ->
+handle_chan_return({ok, NChannel}, State) ->
     reply(State#state{channel= NChannel});
-handle_return({ok, Replies, NChannel}, State) ->
+handle_chan_return({ok, Replies, NChannel}, State) ->
     reply(Replies, State#state{channel= NChannel});
-handle_return({shutdown, Reason, NChannel}, State) ->
+handle_chan_return({shutdown, Reason, NChannel}, State) ->
     stop(Reason, State#state{channel = NChannel});
-handle_return({shutdown, Reason, OutPacket, NChannel}, State) ->
+handle_chan_return({shutdown, Reason, OutPacket, NChannel}, State) ->
     NState = State#state{channel = NChannel},
     stop(Reason, enqueue(OutPacket, NState)).
 
 %%--------------------------------------------------------------------
 %% Handle outgoing packets
 
-handle_outgoing(Packets, State = #state{channel = Channel}) ->
+handle_outgoing(Packets, State = #state{active_n = ActiveN}) ->
     IoData = lists:map(serialize_and_inc_stats_fun(State), Packets),
     Oct = iolist_size(IoData),
     ok = inc_sent_stats(length(Packets), Oct),
-    NChannel = emqx_channel:sent(Oct, Channel),
-    {{binary, IoData}, State#state{channel = NChannel}}.
+    case emqx_pd:get_counter(outgoing_pubs) > ActiveN of
+        true ->
+            OutStats = #{cnt => emqx_pd:reset_counter(outgoing_pubs),
+                         oct => emqx_pd:reset_counter(outgoing_bytes)
+                        },
+            erlang:send(self(), {check_gc, OutStats});
+        false -> ok
+    end,
+    {{binary, IoData}, ensure_stats_timer(State)}.
 
-%% TODO: Duplicated with emqx_channel:serialize_and_inc_stats_fun/1
 serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
-    fun(Packet = ?PACKET(Type)) ->
+    fun(Packet) ->
         case Serialize(Packet) of
             <<>> -> ?LOG(warning, "~s is discarded due to the frame is too large!",
                          [emqx_packet:format(Packet)]),
                     <<>>;
-            Data -> _ = inc_outgoing_stats(Type),
-                    _ = emqx_metrics:inc_sent(Packet),
-                    ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
+            Data -> ?LOG(debug, "SEND ~s", [emqx_packet:format(Packet)]),
+                    ok = inc_outgoing_stats(Packet),
                     Data
         end
     end.
@@ -398,23 +525,33 @@ serialize_and_inc_stats_fun(#state{serialize = Serialize}) ->
           ]}).
 
 inc_recv_stats(Cnt, Oct) ->
-    emqx_pd:update_counter(recv_cnt, Cnt),
-    emqx_pd:update_counter(recv_oct, Oct),
+    emqx_pd:inc_counter(incoming_bytes, Oct),
+    emqx_pd:inc_counter(recv_cnt, Cnt),
+    emqx_pd:inc_counter(recv_oct, Oct),
     emqx_metrics:inc('bytes.received', Oct).
 
-inc_incoming_stats(Type) ->
-    emqx_pd:update_counter(recv_pkt, 1),
-    (Type == ?PUBLISH)
-        andalso emqx_pd:update_counter(recv_msg, 1).
-
-inc_outgoing_stats(Type) ->
-    emqx_pd:update_counter(send_pkt, 1),
-    (Type == ?PUBLISH)
-        andalso emqx_pd:update_counter(send_msg, 1).
+inc_incoming_stats(Packet = ?PACKET(Type)) ->
+    emqx_pd:inc_counter(recv_pkt, 1),
+    if Type == ?PUBLISH ->
+           emqx_pd:inc_counter(recv_msg, 1),
+           emqx_pd:inc_counter(incoming_pubs, 1);
+       true -> ok
+    end,
+    emqx_metrics:inc_recv(Packet).
+
+inc_outgoing_stats(Packet = ?PACKET(Type)) ->
+    emqx_pd:inc_counter(send_pkt, 1),
+    if Type == ?PUBLISH ->
+           emqx_pd:inc_counter(send_msg, 1),
+           emqx_pd:inc_counter(outgoing_pubs, 1);
+       true -> ok
+    end,
+    emqx_metrics:inc_sent(Packet).
 
 inc_sent_stats(Cnt, Oct) ->
-    emqx_pd:update_counter(send_cnt, Cnt),
-    emqx_pd:update_counter(send_oct, Oct),
+    emqx_pd:inc_counter(outgoing_bytes, Oct),
+    emqx_pd:inc_counter(send_cnt, Cnt),
+    emqx_pd:inc_counter(send_oct, Oct),
     emqx_metrics:inc('bytes.sent', Oct).
 
 %%--------------------------------------------------------------------
@@ -451,6 +588,9 @@ enqueue(Packet, State) when is_record(Packet, mqtt_packet) ->
 enqueue(Packets, State = #state{pendings = Pendings}) ->
     State#state{pendings = lists:append(Pendings, Packets)}.
 
+shutdown(Reason, State) ->
+    stop({shutdown, Reason}, State).
+
 stop(Reason, State = #state{pendings = []}) ->
     {stop, State#state{stop_reason = Reason}};
 stop(Reason, State = #state{pendings = Pendings}) ->