Просмотр исходного кода

Add 'active_n' option to optimize the CPU usage of emqx_connection (#2060)

* Add 'active_n' option to optimize the CPU usage of emqx_connection

* Supports batch processing 'DOWN' events
Feng Lee 7 лет назад
Родитель
Сommit
721b72b96a

+ 1 - 1
Makefile

@@ -36,7 +36,7 @@ CT_SUITES = emqx emqx_client emqx_zone emqx_banned emqx_session \
 			emqx_mqtt_props emqx_mqueue emqx_net emqx_pqueue emqx_router emqx_sm \
 			emqx_tables emqx_time emqx_topic emqx_trie emqx_vm emqx_mountpoint \
 			emqx_listeners emqx_protocol emqx_pool emqx_shared_sub emqx_bridge \
-			emqx_hooks emqx_batch emqx_sequence
+			emqx_hooks emqx_batch emqx_sequence emqx_pmon
 
 CT_NODE_NAME = emqxct@127.0.0.1
 CT_OPTS = -cover test/ct.cover.spec -erl_args -name $(CT_NODE_NAME)

+ 15 - 0
etc/emqx.conf

@@ -645,6 +645,11 @@ listener.tcp.external.max_connections = 1024000
 ## Value: Number
 listener.tcp.external.max_conn_rate = 1000
 
+## Specify the {active, N} option for the external MQTT/TCP Socket.
+##
+## Value: Number
+listener.tcp.external.active_n = 100
+
 ## Zone of the external MQTT/TCP listener belonged to.
 ##
 ## See: zone.$name.*
@@ -781,6 +786,11 @@ listener.tcp.internal.max_connections = 10240000
 ## Value: Number
 listener.tcp.internal.max_conn_rate = 1000
 
+## Specify the {active, N} option for the internal MQTT/TCP Socket.
+##
+## Value: Number
+listener.tcp.internal.active_n = 1000
+
 ## Zone of the internal MQTT/TCP listener belonged to.
 ##
 ## Value: String
@@ -888,6 +898,11 @@ listener.ssl.external.max_connections = 102400
 ## Value: Number
 listener.ssl.external.max_conn_rate = 500
 
+## Specify the {active, N} option for the internal MQTT/SSL Socket.
+##
+## Value: Number
+listener.ssl.external.active_n = 100
+
 ## Zone of the external MQTT/SSL listener belonged to.
 ##
 ## Value: String

+ 11 - 0
priv/emqx.schema

@@ -772,6 +772,11 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.tcp.$name.active_n", "emqx.listeners", [
+  {default, 100},
+  {datatype, integer}
+]}.
+
 {mapping, "listener.tcp.$name.zone", "emqx.listeners", [
   {datatype, string}
 ]}.
@@ -867,6 +872,11 @@ end}.
   {datatype, integer}
 ]}.
 
+{mapping, "listener.ssl.$name.active_n", "emqx.listeners", [
+  {default, 100},
+  {datatype, integer}
+]}.
+
 {mapping, "listener.ssl.$name.zone", "emqx.listeners", [
   {datatype, string}
 ]}.
@@ -1283,6 +1293,7 @@ end}.
                           {mqtt_path, cuttlefish:conf_get(Prefix ++ ".mqtt_path", Conf, undefined)},
                           {max_connections, cuttlefish:conf_get(Prefix ++ ".max_connections", Conf)},
                           {max_conn_rate, cuttlefish:conf_get(Prefix ++ ".max_conn_rate", Conf, undefined)},
+                          {active_n, cuttlefish:conf_get(Prefix ++ ".active_n", Conf, undefined)},
                           {tune_buffer, cuttlefish:conf_get(Prefix ++ ".tune_buffer", Conf, undefined)},
                           {zone, Atom(cuttlefish:conf_get(Prefix ++ ".zone", Conf, undefined))},
                           {rate_limit, Ratelimit(cuttlefish:conf_get(Prefix ++ ".rate_limit", Conf, undefined))},

+ 22 - 22
src/emqx_banned.erl

@@ -26,17 +26,16 @@
 
 -export([start_link/0]).
 -export([check/1]).
--export([add/1, del/1]).
+-export([add/1, delete/1]).
 
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
 
 -define(TAB, ?MODULE).
--define(SERVER, ?MODULE).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Mnesia bootstrap
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 mnesia(boot) ->
     ok = ekka_mnesia:create_table(?TAB, [
@@ -52,7 +51,7 @@ mnesia(copy) ->
 %% @doc Start the banned server.
 -spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
-    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
 
 -spec(check(emqx_types:credentials()) -> boolean()).
 check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) ->
@@ -64,25 +63,25 @@ check(#{client_id := ClientId, username := Username, peername := {IPAddr, _}}) -
 add(Banned) when is_record(Banned, banned) ->
     mnesia:dirty_write(?TAB, Banned).
 
--spec(del({client_id, emqx_types:client_id()} |
-          {username, emqx_types:username()} |
-          {peername, emqx_types:peername()}) -> ok).
-del(Key) ->
+-spec(delete({client_id, emqx_types:client_id()}
+           | {username, emqx_types:username()}
+           | {peername, emqx_types:peername()}) -> ok).
+delete(Key) ->
     mnesia:dirty_delete(?TAB, Key).
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% gen_server callbacks
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 init([]) ->
     {ok, ensure_expiry_timer(#{expiry_timer => undefined})}.
 
 handle_call(Req, _From, State) ->
-    emqx_logger:error("[BANNED] unexpected call: ~p", [Req]),
+    emqx_logger:error("[Banned] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
 handle_cast(Msg, State) ->
-    emqx_logger:error("[BANNED] unexpected msg: ~p", [Msg]),
+    emqx_logger:error("[Banned] unexpected msg: ~p", [Msg]),
     {noreply, State}.
 
 handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
@@ -90,7 +89,7 @@ handle_info({timeout, TRef, expire}, State = #{expiry_timer := TRef}) ->
     {noreply, ensure_expiry_timer(State), hibernate};
 
 handle_info(Info, State) ->
-    emqx_logger:error("[BANNED] unexpected info: ~p", [Info]),
+    emqx_logger:error("[Banned] unexpected info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, #{expiry_timer := TRef}) ->
@@ -99,21 +98,22 @@ terminate(_Reason, #{expiry_timer := TRef}) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 %% Internal functions
-%%--------------------------------------------------------------------
+%%------------------------------------------------------------------------------
 
 -ifdef(TEST).
 ensure_expiry_timer(State) ->
     State#{expiry_timer := emqx_misc:start_timer(timer:seconds(2), expire)}.
 -else.
 ensure_expiry_timer(State) ->
-    State#{expiry_timer := emqx_misc:start_timer(timer:minutes(5), expire)}.
+    State#{expiry_timer := emqx_misc:start_timer(timer:minutes(1), expire)}.
 -endif.
 
 expire_banned_items(Now) ->
-    mnesia:foldl(fun
-            (B = #banned{until = Until}, _Acc) when Until < Now ->
-                mnesia:delete_object(?TAB, B, sticky_write);
-            (_, _Acc) -> ok
-        end, ok, ?TAB).
+    mnesia:foldl(
+      fun(B = #banned{until = Until}, _Acc) when Until < Now ->
+              mnesia:delete_object(?TAB, B, sticky_write);
+         (_, _Acc) -> ok
+      end, ok, ?TAB).
+

+ 1 - 1
src/emqx_broker.erl

@@ -371,7 +371,7 @@ cast(Broker, Msg) ->
 
 %% Pick a broker
 pick(Topic) ->
-    gproc_pool:pick_worker(emqx_broker_pool, Topic).
+    gproc_pool:pick_worker(broker_pool, Topic).
 
 %%------------------------------------------------------------------------------
 %% gen_server callbacks

+ 21 - 12
src/emqx_broker_helper.erl

@@ -31,6 +31,8 @@
 -define(SUBSEQ, emqx_subseq).
 -define(SHARD, 1024).
 
+-define(BATCH_SIZE, 10000).
+
 -spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?HELPER}, ?MODULE, [], []).
@@ -106,14 +108,12 @@ handle_cast(Msg, State) ->
     emqx_logger:error("[BrokerHelper] unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
-handle_info({'DOWN', _MRef, process, SubPid, Reason}, State = #{pmon := PMon}) ->
-    case ets:lookup(?SUBMON, SubPid) of
-        [{_, SubId}] ->
-            ok = emqx_pool:async_submit(fun subscriber_down/2, [SubPid, SubId]);
-        [] ->
-            emqx_logger:error("[BrokerHelper] unexpected DOWN: ~p, reason: ~p", [SubPid, Reason])
-    end,
-    {noreply, State#{pmon := emqx_pmon:erase(SubPid, PMon)}};
+handle_info({'DOWN', _MRef, process, SubPid, _Reason}, State = #{pmon := PMon}) ->
+    SubPids = [SubPid | emqx_misc:drain_down(?BATCH_SIZE)],
+    ok = emqx_pool:async_submit(
+           fun lists:foreach/2, [fun clean_down/1, SubPids]),
+    {_, PMon1} = emqx_pmon:erase_all(SubPids, PMon),
+    {noreply, State#{pmon := PMon1}};
 
 handle_info(Info, State) ->
     emqx_logger:error("[BrokerHelper] unexpected info: ~p", [Info]),
@@ -126,8 +126,17 @@ terminate(_Reason, _State) ->
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
 
-subscriber_down(SubPid, SubId) ->
-    true = ets:delete(?SUBMON, SubPid),
-    true = (SubId =:= undefined) orelse ets:delete_object(?SUBID, {SubId, SubPid}),
-    emqx_broker:subscriber_down(SubPid).
+%%------------------------------------------------------------------------------
+%% Internal functions
+%%------------------------------------------------------------------------------
+
+clean_down(SubPid) ->
+    case ets:lookup(?SUBMON, SubPid) of
+        [{_, SubId}] ->
+            true = ets:delete(?SUBMON, SubPid),
+            true = (SubId =:= undefined)
+                orelse ets:delete_object(?SUBID, {SubId, SubPid}),
+            emqx_broker:subscriber_down(SubPid);
+        [] -> ok
+    end.
 

+ 2 - 2
src/emqx_broker_sup.erl

@@ -30,9 +30,9 @@ start_link() ->
 init([]) ->
     %% Broker pool
     PoolSize = emqx_vm:schedulers() * 2,
-    BrokerPool = emqx_pool_sup:spec(broker_pool,
-                                    [emqx_broker_pool, hash, PoolSize,
+    BrokerPool = emqx_pool_sup:spec([broker_pool, hash, PoolSize,
                                      {emqx_broker, start_link, []}]),
+
     %% Shared subscription
     SharedSub = #{id       => shared_sub,
                   start    => {emqx_shared_sub, start_link, []},

+ 80 - 73
src/emqx_cm.erl

@@ -20,74 +20,57 @@
 
 -export([start_link/0]).
 
--export([lookup_connection/1]).
 -export([register_connection/1, register_connection/2]).
--export([unregister_connection/1]).
--export([get_conn_attrs/1, set_conn_attrs/2]).
--export([get_conn_stats/1, set_conn_stats/2]).
+-export([unregister_connection/1, unregister_connection/2]).
+-export([get_conn_attrs/1, get_conn_attrs/2]).
+-export([set_conn_attrs/2, set_conn_attrs/3]).
+-export([get_conn_stats/1, get_conn_stats/2]).
+-export([set_conn_stats/2, set_conn_stats/3]).
 -export([lookup_conn_pid/1]).
 
+%% gen_server callbacks
 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
          code_change/3]).
 
 %% internal export
--export([update_conn_stats/0]).
+-export([stats_fun/0]).
 
 -define(CM, ?MODULE).
 
-%% ETS Tables.
+%% ETS tables for connection management.
 -define(CONN_TAB, emqx_conn).
 -define(CONN_ATTRS_TAB, emqx_conn_attrs).
 -define(CONN_STATS_TAB, emqx_conn_stats).
 
+-define(BATCH_SIZE, 10000).
+
 %% @doc Start the connection manager.
 -spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?CM}, ?MODULE, [], []).
 
-%% @doc Lookup a connection.
--spec(lookup_connection(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})).
-lookup_connection(ClientId) when is_binary(ClientId) ->
-    ets:lookup(?CONN_TAB, ClientId).
+%%------------------------------------------------------------------------------
+%% API
+%%------------------------------------------------------------------------------
 
 %% @doc Register a connection.
--spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok).
+-spec(register_connection(emqx_types:client_id()) -> ok).
 register_connection(ClientId) when is_binary(ClientId) ->
-    register_connection({ClientId, self()});
+    register_connection(ClientId, self()).
 
-register_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
-    true = ets:insert(?CONN_TAB, Conn),
+-spec(register_connection(emqx_types:client_id(), pid()) -> ok).
+register_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) ->
+    true = ets:insert(?CONN_TAB, {ClientId, ConnPid}),
     notify({registered, ClientId, ConnPid}).
 
--spec(register_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}, list()) -> ok).
-register_connection(ClientId, Attrs) when is_binary(ClientId) ->
-    register_connection({ClientId, self()}, Attrs);
-register_connection(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
-    set_conn_attrs(Conn, Attrs),
-    register_connection(Conn).
-
-%% @doc Get conn attrs
--spec(get_conn_attrs({emqx_types:client_id(), pid()}) -> list()).
-get_conn_attrs(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
-    try
-        ets:lookup_element(?CONN_ATTRS_TAB, Conn, 2)
-    catch
-        error:badarg -> []
-    end.
-
-%% @doc Set conn attrs
-set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) ->
-    set_conn_attrs({ClientId, self()}, Attrs);
-set_conn_attrs(Conn = {ClientId, ConnPid}, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
-    ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}).
-
-%% @doc Unregister a conn.
--spec(unregister_connection(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok).
+%% @doc Unregister a connection.
+-spec(unregister_connection(emqx_types:client_id()) -> ok).
 unregister_connection(ClientId) when is_binary(ClientId) ->
-    unregister_connection({ClientId, self()});
+    unregister_connection(ClientId, self()).
 
-unregister_connection(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
-    do_unregister_connection(Conn),
+-spec(unregister_connection(emqx_types:client_id(), pid()) -> ok).
+unregister_connection(ClientId, ConnPid) when is_binary(ClientId), is_pid(ConnPid) ->
+    true = do_unregister_connection({ClientId, ConnPid}),
     notify({unregistered, ConnPid}).
 
 do_unregister_connection(Conn) ->
@@ -95,30 +78,52 @@ do_unregister_connection(Conn) ->
     true = ets:delete(?CONN_ATTRS_TAB, Conn),
     true = ets:delete_object(?CONN_TAB, Conn).
 
-%% @doc Lookup connection pid
--spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined).
-lookup_conn_pid(ClientId) when is_binary(ClientId) ->
-    case ets:lookup(?CONN_TAB, ClientId) of
-        [] -> undefined;
-        [{_, Pid}] -> Pid
-    end.
+%% @doc Get conn attrs
+-spec(get_conn_attrs(emqx_types:client_id()) -> list()).
+get_conn_attrs(ClientId) when is_binary(ClientId) ->
+    ConnPid = lookup_conn_pid(ClientId),
+    get_conn_attrs(ClientId, ConnPid).
+
+-spec(get_conn_attrs(emqx_types:client_id(), pid()) -> list()).
+get_conn_attrs(ClientId, ConnPid) when is_binary(ClientId) ->
+    emqx_tables:lookup_value(?CONN_ATTRS_TAB, {ClientId, ConnPid}, []).
+
+%% @doc Set conn attrs
+-spec(set_conn_attrs(emqx_types:client_id(), list()) -> true).
+set_conn_attrs(ClientId, Attrs) when is_binary(ClientId) ->
+    set_conn_attrs(ClientId, self(), Attrs).
+
+-spec(set_conn_attrs(emqx_types:client_id(), pid(), list()) -> true).
+set_conn_attrs(ClientId, ConnPid, Attrs) when is_binary(ClientId), is_pid(ConnPid) ->
+    Conn = {ClientId, ConnPid},
+    ets:insert(?CONN_ATTRS_TAB, {Conn, Attrs}).
 
 %% @doc Get conn stats
--spec(get_conn_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
-get_conn_stats(Conn = {ClientId, ConnPid}) when is_binary(ClientId), is_pid(ConnPid) ->
-    try ets:lookup_element(?CONN_STATS_TAB, Conn, 2)
-    catch
-        error:badarg -> []
-    end.
+-spec(get_conn_stats(emqx_types:client_id()) -> list(emqx_stats:stats())).
+get_conn_stats(ClientId) when is_binary(ClientId) ->
+    ConnPid = lookup_conn_pid(ClientId),
+    get_conn_stats(ClientId, ConnPid).
+
+-spec(get_conn_stats(emqx_types:client_id(), pid()) -> list(emqx_stats:stats())).
+get_conn_stats(ClientId, ConnPid) when is_binary(ClientId) ->
+    Conn = {ClientId, ConnPid},
+    emqx_tables:lookup_value(?CONN_STATS_TAB, Conn, []).
 
 %% @doc Set conn stats.
--spec(set_conn_stats(emqx_types:client_id(), list(emqx_stats:stats())) -> boolean()).
+-spec(set_conn_stats(emqx_types:client_id(), list(emqx_stats:stats())) -> true).
 set_conn_stats(ClientId, Stats) when is_binary(ClientId) ->
-    set_conn_stats({ClientId, self()}, Stats);
+    set_conn_stats(ClientId, self(), Stats).
 
-set_conn_stats(Conn = {ClientId, ConnPid}, Stats) when is_binary(ClientId), is_pid(ConnPid) ->
+-spec(set_conn_stats(emqx_types:client_id(), pid(), list(emqx_stats:stats())) -> true).
+set_conn_stats(ClientId, ConnPid, Stats) when is_binary(ClientId), is_pid(ConnPid) ->
+    Conn = {ClientId, ConnPid},
     ets:insert(?CONN_STATS_TAB, {Conn, Stats}).
 
+%% @doc Lookup connection pid.
+-spec(lookup_conn_pid(emqx_types:client_id()) -> pid() | undefined).
+lookup_conn_pid(ClientId) when is_binary(ClientId) ->
+    emqx_tables:lookup_value(?CONN_TAB, ClientId).
+
 notify(Msg) ->
     gen_server:cast(?CM, {notify, Msg}).
 
@@ -131,7 +136,7 @@ init([]) ->
     ok = emqx_tables:new(?CONN_TAB, [{read_concurrency, true} | TabOpts]),
     ok = emqx_tables:new(?CONN_ATTRS_TAB, TabOpts),
     ok = emqx_tables:new(?CONN_STATS_TAB, TabOpts),
-    ok = emqx_stats:update_interval(cm_stats, fun ?MODULE:update_conn_stats/0),
+    ok = emqx_stats:update_interval(conn_stats, fun ?MODULE:stats_fun/0),
     {ok, #{conn_pmon => emqx_pmon:new()}}.
 
 handle_call(Req, _From, State) ->
@@ -148,26 +153,19 @@ handle_cast(Msg, State) ->
     emqx_logger:error("[CM] unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
-handle_info({'DOWN', _MRef, process, ConnPid, _Reason}, State = #{conn_pmon := PMon}) ->
-    case emqx_pmon:find(ConnPid, PMon) of
-        undefined ->
-            {noreply, State};
-        ClientId ->
-            Conn = {ClientId, ConnPid},
-            case ets:member(?CONN_ATTRS_TAB, Conn) of
-                true ->
-                    ok = emqx_pool:async_submit(fun do_unregister_connection/1, [Conn]);
-                false -> ok
-            end,
-            {noreply, State#{conn_pmon := emqx_pmon:erase(ConnPid, PMon)}}
-    end;
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{conn_pmon := PMon}) ->
+    ConnPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
+    {Items, PMon1} = emqx_pmon:erase_all(ConnPids, PMon),
+    ok = emqx_pool:async_submit(
+           fun lists:foreach/2, [fun clean_down/1, Items]),
+    {noreply, State#{conn_pmon := PMon1}};
 
 handle_info(Info, State) ->
     emqx_logger:error("[CM] unexpected info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, _State) ->
-    emqx_stats:cancel_update(cm_stats).
+    emqx_stats:cancel_update(conn_stats).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -176,7 +174,16 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%------------------------------------------------------------------------------
 
-update_conn_stats() ->
+clean_down({Pid, ClientId}) ->
+    Conn = {ClientId, Pid},
+    case ets:member(?CONN_TAB, ClientId)
+         orelse ets:member(?CONN_ATTRS_TAB, Conn) of
+        true ->
+            do_unregister_connection(Conn);
+        false -> false
+    end.
+
+stats_fun() ->
     case ets:info(?CONN_TAB, size) of
         undefined -> ok;
         Size -> emqx_stats:setstat('connections/count', 'connections/max', Size)

+ 12 - 12
src/emqx_cm_sup.erl

@@ -25,17 +25,17 @@ start_link() ->
     supervisor:start_link({local, ?MODULE}, ?MODULE, []).
 
 init([]) ->
-    Banned = #{id       => banned,
-               start    => {emqx_banned, start_link, []},
-               restart  => permanent,
-               shutdown => 5000,
-               type     => worker,
-               modules  => [emqx_banned]},
-    Manager = #{id       => manager,
-                start    => {emqx_cm, start_link, []},
-                restart  => permanent,
-                shutdown => 5000,
-                type     => worker,
-                modules  => [emqx_cm]},
+    Banned = #{id => banned,
+               start => {emqx_banned, start_link, []},
+               restart => permanent,
+               shutdown => 1000,
+               type => worker,
+               modules => [emqx_banned]},
+    Manager = #{id => manager,
+                start => {emqx_cm, start_link, []},
+                restart => permanent,
+                shutdown => 2000,
+                type => worker,
+                modules => [emqx_cm]},
     {ok, {{one_for_one, 10, 100}, [Banned, Manager]}}.
 

+ 33 - 22
src/emqx_connection.erl

@@ -38,7 +38,7 @@
           peername,
           sockname,
           conn_state,
-          await_recv,
+          active_n,
           proto_state,
           parser_state,
           keepalive,
@@ -51,6 +51,7 @@
           idle_timeout
          }).
 
+-define(DEFAULT_ACTIVE_N, 100).
 -define(SOCK_STATS, [recv_oct, recv_cnt, send_oct, send_cnt, send_pend]).
 
 start_link(Transport, Socket, Options) ->
@@ -69,7 +70,7 @@ info(#state{transport     = Transport,
             peername      = Peername,
             sockname      = Sockname,
             conn_state    = ConnState,
-            await_recv    = AwaitRecv,
+            active_n      = ActiveN,
             rate_limit    = RateLimit,
             publish_limit = PubLimit,
             proto_state   = ProtoState}) ->
@@ -77,7 +78,7 @@ info(#state{transport     = Transport,
                 {peername, Peername},
                 {sockname, Sockname},
                 {conn_state, ConnState},
-                {await_recv, AwaitRecv},
+                {active_n, ActiveN},
                 {rate_limit, esockd_rate_limit:info(RateLimit)},
                 {publish_limit, esockd_rate_limit:info(PubLimit)}],
     ProtoInfo = emqx_protocol:info(ProtoState),
@@ -87,8 +88,8 @@ info(#state{transport     = Transport,
 attrs(CPid) when is_pid(CPid) ->
     call(CPid, attrs);
 
-attrs(#state{peername    = Peername,
-             sockname    = Sockname,
+attrs(#state{peername = Peername,
+             sockname = Sockname,
              proto_state = ProtoState}) ->
     SockAttrs = [{peername, Peername},
                  {sockname, Sockname}],
@@ -129,6 +130,7 @@ init([Transport, RawSocket, Options]) ->
             Peercert = Transport:ensure_ok_or_exit(peercert, [Socket]),
             RateLimit = init_limiter(proplists:get_value(rate_limit, Options)),
             PubLimit = init_limiter(emqx_zone:get_env(Zone, publish_limit)),
+            ActiveN = proplists:get_value(active_n, Options, ?DEFAULT_ACTIVE_N),
             EnableStats = emqx_zone:get_env(Zone, enable_stats, true),
             IdleTimout = emqx_zone:get_env(Zone, idle_timeout, 30000),
             SendFun = send_fun(Transport, Socket),
@@ -140,8 +142,8 @@ init([Transport, RawSocket, Options]) ->
             State = run_socket(#state{transport     = Transport,
                                       socket        = Socket,
                                       peername      = Peername,
-                                      await_recv    = false,
                                       conn_state    = running,
+                                      active_n      = ActiveN,
                                       rate_limit    = RateLimit,
                                       publish_limit = PubLimit,
                                       proto_state   = ProtoState,
@@ -243,19 +245,26 @@ handle_info({shutdown, conflict, {ClientId, NewPid}}, State) ->
     ?LOG(warning, "clientid '~s' conflict with ~p", [ClientId, NewPid]),
     shutdown(conflict, State);
 
-handle_info(activate_sock, State) ->
-    {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})};
-
-handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
+handle_info({tcp, _Sock, Data}, State) ->
     ?LOG(debug, "RECV ~p", [Data]),
     Size = iolist_size(Data),
     emqx_metrics:trans(inc, 'bytes/received', Size),
     Incoming = #{bytes => Size, packets => 0},
-    handle_packet(Data, State#state{await_recv = false, incoming = Incoming});
+    handle_packet(Data, State#state{incoming = Incoming});
+
+%% Rate limit here, cool:)
+handle_info({tcp_passive, _Sock}, State) ->
+    {noreply, ensure_rate_limit(State)};
 
-handle_info({inet_async, _Sock, _Ref, {error, Reason}}, State) ->
+handle_info({tcp_error, _Sock, Reason}, State) ->
     shutdown(Reason, State);
 
+handle_info({tcp_closed, _Sock}, State) ->
+    shutdown(closed, State);
+
+handle_info(activate_sock, State) ->
+    {noreply, run_socket(State#state{conn_state = running, limit_timer = undefined})};
+
 handle_info({inet_reply, _Sock, ok}, State) ->
     {noreply, State};
 
@@ -314,16 +323,17 @@ code_change(_OldVsn, State, _Extra) ->
 %%------------------------------------------------------------------------------
 
 %% Receive and parse data
-handle_packet(<<>>, State0) ->
-    State = ensure_stats_timer(ensure_rate_limit(State0)),
-    ok = maybe_gc(State, incoming),
-    {noreply, State};
+handle_packet(<<>>, State) ->
+    NState = ensure_stats_timer(State),
+    ok = maybe_gc(NState, incoming),
+    {noreply, NState};
+
 handle_packet(Data, State = #state{proto_state  = ProtoState,
                                    parser_state = ParserState,
                                    idle_timeout = IdleTimeout}) ->
     case catch emqx_frame:parse(Data, ParserState) of
         {more, NewParserState} ->
-            {noreply, run_socket(State#state{parser_state = NewParserState}), IdleTimeout};
+            {noreply, State#state{parser_state = NewParserState}, IdleTimeout};
         {ok, Packet = ?PACKET(Type), Rest} ->
             emqx_metrics:received(Packet),
             case emqx_protocol:received(Packet, ProtoState) of
@@ -352,6 +362,7 @@ reset_parser(State = #state{proto_state = ProtoState}) ->
 inc_publish_cnt(Type, State = #state{incoming = Incoming = #{packets := Cnt}})
     when Type == ?PUBLISH; Type == ?SUBSCRIBE ->
     State#state{incoming = Incoming#{packets := Cnt + 1}};
+
 inc_publish_cnt(_Type, State) ->
     State.
 
@@ -379,11 +390,11 @@ ensure_rate_limit([{Rl, Pos, Num}|Limiters], State) ->
 
 run_socket(State = #state{conn_state = blocked}) ->
     State;
-run_socket(State = #state{await_recv = true}) ->
-    State;
-run_socket(State = #state{transport = Transport, socket = Socket}) ->
-    Transport:async_recv(Socket, 0, infinity),
-    State#state{await_recv = true}.
+run_socket(State = #state{transport = Transport,
+                          socket = Socket,
+                          active_n = ActiveN}) ->
+    Transport:setopts(Socket, [{active, ActiveN}]),
+    State.
 
 %%------------------------------------------------------------------------------
 %% Ensure stats timer

+ 18 - 0
src/emqx_misc.erl

@@ -19,6 +19,8 @@
 
 -export([init_proc_mng_policy/1, conn_proc_mng_policy/1]).
 
+-export([drain_down/1]).
+
 %% @doc Merge options
 -spec(merge_opts(list(), list()) -> list()).
 merge_opts(Defaults, Options) ->
@@ -108,3 +110,19 @@ is_enabled(Max) -> is_integer(Max) andalso Max > ?DISABLED.
 proc_info(Key) ->
     {Key, Value} = erlang:process_info(self(), Key),
     Value.
+
+-spec(drain_down(pos_integer()) -> list(pid())).
+drain_down(Cnt) when Cnt > 0 ->
+    drain_down(Cnt, []).
+
+drain_down(0, Acc) ->
+    lists:reverse(Acc);
+
+drain_down(Cnt, Acc) ->
+    receive
+        {'DOWN', _MRef, process, Pid, _Reason} ->
+            drain_down(Cnt - 1, [Pid|Acc])
+    after 0 ->
+          lists:reverse(Acc)
+    end.
+

+ 28 - 10
src/emqx_pmon.erl

@@ -14,24 +14,27 @@
 
 -module(emqx_pmon).
 
+-compile({no_auto_import, [monitor/3]}).
+
 -export([new/0]).
 -export([monitor/2, monitor/3]).
 -export([demonitor/2]).
 -export([find/2]).
--export([erase/2]).
-
--compile({no_auto_import,[monitor/3]}).
+-export([erase/2, erase_all/2]).
+-export([count/1]).
 
 -type(pmon() :: {?MODULE, map()}).
 -export_type([pmon/0]).
 
 -spec(new() -> pmon()).
-new() -> {?MODULE, maps:new()}.
+new() ->
+    {?MODULE, maps:new()}.
 
 -spec(monitor(pid(), pmon()) -> pmon()).
 monitor(Pid, PM) ->
-    monitor(Pid, undefined, PM).
+    ?MODULE:monitor(Pid, undefined, PM).
 
+-spec(monitor(pid(), term(), pmon()) -> pmon()).
 monitor(Pid, Val, {?MODULE, PM}) ->
     {?MODULE, case maps:is_key(Pid, PM) of
                   true  -> PM;
@@ -43,21 +46,36 @@ monitor(Pid, Val, {?MODULE, PM}) ->
 demonitor(Pid, {?MODULE, PM}) ->
     {?MODULE, case maps:find(Pid, PM) of
                   {ok, {Ref, _Val}} ->
-                      %% Don't flush
-                      _ = erlang:demonitor(Ref),
+                      %% flush
+                      _ = erlang:demonitor(Ref, [flush]),
                       maps:remove(Pid, PM);
                   error -> PM
               end}.
 
--spec(find(pid(), pmon()) -> undefined | term()).
+-spec(find(pid(), pmon()) -> error | {ok, term()}).
 find(Pid, {?MODULE, PM}) ->
     case maps:find(Pid, PM) of
         {ok, {_Ref, Val}} ->
-            Val;
-        error -> undefined
+            {ok, Val};
+        error -> error
     end.
 
 -spec(erase(pid(), pmon()) -> pmon()).
 erase(Pid, {?MODULE, PM}) ->
     {?MODULE, maps:remove(Pid, PM)}.
 
+-spec(erase_all([pid()], pmon()) -> {[{pid(), term()}], pmon()}).
+erase_all(Pids, PMon0) ->
+    lists:foldl(
+      fun(Pid, {Acc, PMon}) ->
+          case find(Pid, PMon) of
+              {ok, Val} ->
+                  {[{Pid, Val}|Acc], erase(Pid, PMon)};
+              error -> {Acc, PMon}
+          end
+      end, {[], PMon0}, Pids).
+
+-spec(count(pmon()) -> non_neg_integer()).
+count({?MODULE, PM}) ->
+    maps:size(PM).
+

+ 0 - 2
src/emqx_pool_sup.erl

@@ -39,8 +39,6 @@ start_link(Pool, Type, MFA) ->
 
 -spec(start_link(atom() | tuple(), atom(), pos_integer(), mfa())
       -> {ok, pid()} | {error, term()}).
-start_link(Pool, Type, Size, MFA) when is_atom(Pool) ->
-    supervisor:start_link({local, Pool}, ?MODULE, [Pool, Type, Size, MFA]);
 start_link(Pool, Type, Size, MFA) ->
     supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]).
 

+ 11 - 10
src/emqx_protocol.erl

@@ -320,7 +320,8 @@ process_packet(?CONNECT_PACKET(
                       case try_open_session(PState3) of
                           {ok, SPid, SP} ->
                               PState4 = PState3#pstate{session = SPid, connected = true},
-                              ok = emqx_cm:register_connection(client_id(PState4), attrs(PState4)),
+                              ok = emqx_cm:register_connection(client_id(PState4)),
+                              true = emqx_cm:set_conn_attrs(client_id(PState4), attrs(PState4)),
                               %% Start keepalive
                               start_keepalive(Keepalive, PState4),
                               %% Success
@@ -497,18 +498,18 @@ do_publish(Packet = ?PUBLISH_PACKET(QoS, PacketId),
 
 puback(?QOS_0, _PacketId, _Result, PState) ->
     {ok, PState};
-puback(?QOS_1, PacketId, {error, ReasonCode}, PState) ->
-    deliver({puback, PacketId, ReasonCode}, PState);
-puback(?QOS_1, PacketId, {ok, []}, PState) ->
+puback(?QOS_1, PacketId, [], PState) ->
     deliver({puback, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
-puback(?QOS_1, PacketId, {ok, _}, PState) ->
+puback(?QOS_1, PacketId, [_|_], PState) -> %%TODO: check the dispatch?
     deliver({puback, PacketId, ?RC_SUCCESS}, PState);
-puback(?QOS_2, PacketId, {error, ReasonCode}, PState) ->
-    deliver({pubrec, PacketId, ReasonCode}, PState);
-puback(?QOS_2, PacketId, {ok, []}, PState) ->
+puback(?QOS_1, PacketId, {error, ReasonCode}, PState) ->
+    deliver({puback, PacketId, ReasonCode}, PState);
+puback(?QOS_2, PacketId, [], PState) ->
     deliver({pubrec, PacketId, ?RC_NO_MATCHING_SUBSCRIBERS}, PState);
-puback(?QOS_2, PacketId, {ok, _}, PState) ->
-    deliver({pubrec, PacketId, ?RC_SUCCESS}, PState).
+puback(?QOS_2, PacketId, [_|_], PState) -> %%TODO: check the dispatch?
+    deliver({pubrec, PacketId, ?RC_SUCCESS}, PState);
+puback(?QOS_2, PacketId, {error, ReasonCode}, PState) ->
+    deliver({pubrec, PacketId, ReasonCode}, PState).
 
 %%------------------------------------------------------------------------------
 %% Deliver Packet -> Client

+ 1 - 1
src/emqx_router.erl

@@ -148,7 +148,7 @@ call(Router, Msg) ->
     gen_server:call(Router, Msg, infinity).
 
 pick(Topic) ->
-    gproc_pool:pick_worker(emqx_router_pool, Topic).
+    gproc_pool:pick_worker(router_pool, Topic).
 
 %%------------------------------------------------------------------------------
 %% gen_server callbacks

+ 2 - 2
src/emqx_router_sup.erl

@@ -17,6 +17,7 @@
 -behaviour(supervisor).
 
 -export([start_link/0]).
+
 -export([init/1]).
 
 start_link() ->
@@ -32,8 +33,7 @@ init([]) ->
                modules  => [emqx_router_helper]},
 
     %% Router pool
-    RouterPool = emqx_pool_sup:spec(router_pool,
-                                    [emqx_router_pool, hash, emqx_vm:schedulers(),
+    RouterPool = emqx_pool_sup:spec([router_pool, hash,
                                      {emqx_router, start_link, []}]),
     {ok, {{one_for_all, 0, 1}, [Helper, RouterPool]}}.
 

+ 3 - 2
src/emqx_session.erl

@@ -259,7 +259,7 @@ subscribe(SPid, PacketId, Properties, TopicFilters) ->
 
 %% @doc Called by connection processes when publishing messages
 -spec(publish(spid(), emqx_mqtt_types:packet_id(), emqx_types:message())
-      -> {ok, emqx_types:deliver_results()}).
+      -> emqx_types:deliver_results() | {error, term()}).
 publish(_SPid, _PacketId, Msg = #message{qos = ?QOS_0}) ->
     %% Publish QoS0 message directly
     emqx_broker:publish(Msg);
@@ -370,7 +370,8 @@ init([Parent, #{zone                := Zone,
                    topic_alias_maximum = TopicAliasMaximum,
                    will_msg            = WillMsg
                   },
-    ok = emqx_sm:register_session(ClientId, attrs(State)),
+    ok = emqx_sm:register_session(ClientId, self()),
+    true = emqx_sm:set_session_attrs(ClientId, attrs(State)),
     true = emqx_sm:set_session_stats(ClientId, stats(State)),
     emqx_hooks:run('session.created', [#{client_id => ClientId}, info(State)]),
     GcPolicy = emqx_zone:get_env(Zone, force_gc_policy, false),

+ 125 - 99
src/emqx_sm.erl

@@ -21,12 +21,15 @@
 -export([start_link/0]).
 
 -export([open_session/1, close_session/1]).
--export([lookup_session/1, lookup_session_pid/1]).
 -export([resume_session/2]).
 -export([discard_session/1, discard_session/2]).
--export([register_session/2, unregister_session/1]).
--export([get_session_attrs/1, set_session_attrs/2]).
--export([get_session_stats/1, set_session_stats/2]).
+-export([register_session/1, register_session/2]).
+-export([unregister_session/1, unregister_session/2]).
+-export([get_session_attrs/1, get_session_attrs/2,
+         set_session_attrs/2, set_session_attrs/3]).
+-export([get_session_stats/1, get_session_stats/2,
+         set_session_stats/2, set_session_stats/3]).
+-export([lookup_session_pids/1]).
 
 %% Internal functions for rpc
 -export([dispatch/3]).
@@ -46,6 +49,8 @@
 -define(SESSION_ATTRS_TAB, emqx_session_attrs).
 -define(SESSION_STATS_TAB, emqx_session_stats).
 
+-define(BATCH_SIZE, 10000).
+
 -spec(start_link() -> emqx_types:startlink_ret()).
 start_link() ->
     gen_server:start_link({local, ?SM}, ?MODULE, [], []).
@@ -62,8 +67,8 @@ open_session(SessAttrs = #{clean_start := true, client_id := ClientId, conn_pid
 open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) ->
     ResumeStart = fun(_) ->
                       case resume_session(ClientId, SessAttrs) of
-                          {ok, SPid} ->
-                              {ok, SPid, true};
+                          {ok, SessPid} ->
+                              {ok, SessPid, true};
                           {error, not_found} ->
                               emqx_session:start_link(SessAttrs)
                       end
@@ -75,76 +80,68 @@ open_session(SessAttrs = #{clean_start := false, client_id := ClientId}) ->
 discard_session(ClientId) when is_binary(ClientId) ->
     discard_session(ClientId, self()).
 
+-spec(discard_session(emqx_types:client_id(), pid()) -> ok).
 discard_session(ClientId, ConnPid) when is_binary(ClientId) ->
     lists:foreach(
-      fun({_ClientId, SPid}) ->
-              case catch emqx_session:discard(SPid, ConnPid) of
-                  {Err, Reason} when Err =:= 'EXIT'; Err =:= error ->
-                      emqx_logger:error("[SM] Failed to discard ~p: ~p", [SPid, Reason]);
-                  ok -> ok
-              end
-      end, lookup_session(ClientId)).
+      fun(SessPid) ->
+          try emqx_session:discard(SessPid, ConnPid)
+          catch
+              _:Error:_Stk ->
+                  emqx_logger:error("[SM] Failed to discard ~p: ~p", [SessPid, Error])
+          end
+      end, lookup_session_pids(ClientId)).
 
 %% @doc Try to resume a session.
 -spec(resume_session(emqx_types:client_id(), map()) -> {ok, pid()} | {error, term()}).
 resume_session(ClientId, SessAttrs = #{conn_pid := ConnPid}) ->
-    case lookup_session(ClientId) of
+    case lookup_session_pids(ClientId) of
         [] -> {error, not_found};
-        [{_ClientId, SPid}] ->
-            ok = emqx_session:resume(SPid, SessAttrs),
-            {ok, SPid};
-        Sessions ->
-            [{_, SPid}|StaleSessions] = lists:reverse(Sessions),
-            emqx_logger:error("[SM] More than one session found: ~p", [Sessions]),
-            lists:foreach(fun({_, StalePid}) ->
+        [SessPid] ->
+            ok = emqx_session:resume(SessPid, SessAttrs),
+            {ok, SessPid};
+        SessPids ->
+            [SessPid|StalePids] = lists:reverse(SessPids),
+            emqx_logger:error("[SM] More than one session found: ~p", [SessPids]),
+            lists:foreach(fun(StalePid) ->
                               catch emqx_session:discard(StalePid, ConnPid)
-                          end, StaleSessions),
-            ok = emqx_session:resume(SPid, SessAttrs),
-            {ok, SPid}
+                          end, StalePids),
+            ok = emqx_session:resume(SessPid, SessAttrs),
+            {ok, SessPid}
     end.
 
 %% @doc Close a session.
--spec(close_session({emqx_types:client_id(), pid()} | pid()) -> ok).
-close_session({_ClientId, SPid}) ->
-    emqx_session:close(SPid);
-close_session(SPid) when is_pid(SPid) ->
-    emqx_session:close(SPid).
-
-%% @doc Register a session with attributes.
--spec(register_session(emqx_types:client_id() | {emqx_types:client_id(), pid()},
-                       list(emqx_session:attr())) -> ok).
-register_session(ClientId, SessAttrs) when is_binary(ClientId) ->
-    register_session({ClientId, self()}, SessAttrs);
-
-register_session(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) ->
-    true = ets:insert(?SESSION_TAB, Session),
-    true = ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}),
-    true = proplists:get_value(clean_start, SessAttrs, true)
-            orelse ets:insert(?SESSION_P_TAB, Session),
-    ok = emqx_sm_registry:register_session(Session),
-    notify({registered, ClientId, SPid}).
+-spec(close_session(emqx_types:client_id() | pid()) -> ok).
+close_session(ClientId) when is_binary(ClientId) ->
+    case lookup_session_pids(ClientId) of
+        [] -> ok;
+        [SessPid] -> close_session(SessPid);
+        SessPids -> lists:foreach(fun close_session/1, SessPids)
+    end;
 
-%% @doc Get session attrs
--spec(get_session_attrs({emqx_types:client_id(), pid()}) -> list(emqx_session:attr())).
-get_session_attrs(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
-    emqx_tables:lookup_value(?SESSION_ATTRS_TAB, Session, []).
+close_session(SessPid) when is_pid(SessPid) ->
+    emqx_session:close(SessPid).
 
-%% @doc Set session attrs
--spec(set_session_attrs(emqx_types:client_id() | {emqx_types:client_id(), pid()},
-                        list(emqx_session:attr())) -> true).
-set_session_attrs(ClientId, SessAttrs) when is_binary(ClientId) ->
-    set_session_attrs({ClientId, self()}, SessAttrs);
-set_session_attrs(Session = {ClientId, SPid}, SessAttrs) when is_binary(ClientId), is_pid(SPid) ->
-    ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}).
+%% @doc Register a session.
+-spec(register_session(emqx_types:client_id()) -> ok).
+register_session(ClientId) when is_binary(ClientId) ->
+    register_session(ClientId, self()).
+
+-spec(register_session(emqx_types:client_id(), pid()) -> ok).
+register_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) ->
+    Session = {ClientId, SessPid},
+    true = ets:insert(?SESSION_TAB, Session),
+    ok = emqx_sm_registry:register_session(Session),
+    notify({registered, ClientId, SessPid}).
 
 %% @doc Unregister a session
--spec(unregister_session(emqx_types:client_id() | {emqx_types:client_id(), pid()}) -> ok).
+-spec(unregister_session(emqx_types:client_id()) -> ok).
 unregister_session(ClientId) when is_binary(ClientId) ->
-    unregister_session({ClientId, self()});
+    unregister_session(ClientId, self()).
 
-unregister_session(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
-    ok = do_unregister_session(Session),
-    notify({unregistered, ClientId, SPid}).
+-spec(unregister_session(emqx_types:client_id(), pid()) -> ok).
+unregister_session(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) ->
+    ok = do_unregister_session({ClientId, SessPid}),
+    notify({unregistered, SessPid}).
 
 %% @private
 do_unregister_session(Session) ->
@@ -154,42 +151,69 @@ do_unregister_session(Session) ->
     true = ets:delete_object(?SESSION_TAB, Session),
     emqx_sm_registry:unregister_session(Session).
 
+%% @doc Get session attrs
+-spec(get_session_attrs(emqx_types:client_id()) -> list(emqx_session:attr())).
+get_session_attrs(ClientId) when is_binary(ClientId) ->
+    case lookup_session_pids(ClientId) of
+        [] -> [];
+        [SessPid|_] -> get_session_attrs(ClientId, SessPid)
+    end.
+
+-spec(get_session_attrs(emqx_types:client_id(), pid()) -> list(emqx_session:attr())).
+get_session_attrs(ClientId, SessPid) when is_binary(ClientId), is_pid(SessPid) ->
+    emqx_tables:lookup_value(?SESSION_ATTRS_TAB, {ClientId, SessPid}, []).
+
+%% @doc Set session attrs
+-spec(set_session_attrs(emqx_types:client_id(), list(emqx_session:attr())) -> true).
+set_session_attrs(ClientId, SessAttrs) when is_binary(ClientId) ->
+    set_session_attrs(ClientId, self(), SessAttrs).
+
+-spec(set_session_attrs(emqx_types:client_id(), pid(), list(emqx_session:attr())) -> true).
+set_session_attrs(ClientId, SessPid, SessAttrs) when is_binary(ClientId), is_pid(SessPid) ->
+    Session = {ClientId, SessPid},
+    true = ets:insert(?SESSION_ATTRS_TAB, {Session, SessAttrs}),
+    proplists:get_value(clean_start, SessAttrs, true) orelse ets:insert(?SESSION_P_TAB, Session).
+
 %% @doc Get session stats
--spec(get_session_stats({emqx_types:client_id(), pid()}) -> list(emqx_stats:stats())).
-get_session_stats(Session = {ClientId, SPid}) when is_binary(ClientId), is_pid(SPid) ->
-    emqx_tables:lookup_value(?SESSION_STATS_TAB, Session, []).
+-spec(get_session_stats(emqx_types:client_id()) -> list(emqx_stats:stats())).
+get_session_stats(ClientId) when is_binary(ClientId) ->
+    case lookup_session_pids(ClientId) of
+        [] -> [];
+        [SessPid|_] ->
+            get_session_stats(ClientId, SessPid)
+    end.
+
+-spec(get_session_stats(emqx_types:client_id(), pid()) -> list(emqx_stats:stats())).
+get_session_stats(ClientId, SessPid) when is_binary(ClientId) ->
+    emqx_tables:lookup_value(?SESSION_STATS_TAB, {ClientId, SessPid}, []).
 
 %% @doc Set session stats
--spec(set_session_stats(emqx_types:client_id() | {emqx_types:client_id(), pid()},
-                        emqx_stats:stats()) -> true).
+-spec(set_session_stats(emqx_types:client_id(), emqx_stats:stats()) -> true).
 set_session_stats(ClientId, Stats) when is_binary(ClientId) ->
-    set_session_stats({ClientId, self()}, Stats);
-set_session_stats(Session = {ClientId, SPid}, Stats) when is_binary(ClientId), is_pid(SPid) ->
-    ets:insert(?SESSION_STATS_TAB, {Session, Stats}).
+    set_session_stats(ClientId, self(), Stats).
 
-%% @doc Lookup a session from registry
--spec(lookup_session(emqx_types:client_id()) -> list({emqx_types:client_id(), pid()})).
-lookup_session(ClientId) ->
+-spec(set_session_stats(emqx_types:client_id(), pid(), emqx_stats:stats()) -> true).
+set_session_stats(ClientId, SessPid, Stats) when is_binary(ClientId), is_pid(SessPid) ->
+    ets:insert(?SESSION_STATS_TAB, {{ClientId, SessPid}, Stats}).
+
+%% @doc Lookup session pid.
+-spec(lookup_session_pids(emqx_types:client_id()) -> list(pid())).
+lookup_session_pids(ClientId) ->
     case emqx_sm_registry:is_enabled() of
         true -> emqx_sm_registry:lookup_session(ClientId);
-        false -> ets:lookup(?SESSION_TAB, ClientId)
+        false -> ets:lookup(?SESSION_TAB, ClientId, [])
     end.
 
 %% @doc Dispatch a message to the session.
 -spec(dispatch(emqx_types:client_id(), emqx_topic:topic(), emqx_types:message()) -> any()).
 dispatch(ClientId, Topic, Msg) ->
-    case lookup_session_pid(ClientId) of
-        Pid when is_pid(Pid) ->
-            Pid ! {dispatch, Topic, Msg};
-        undefined ->
+    case lookup_session_pids(ClientId) of
+        [SessPid|_] when is_pid(SessPid) ->
+            SessPid ! {dispatch, Topic, Msg};
+        [] ->
             emqx_hooks:run('message.dropped', [#{client_id => ClientId}, Msg])
     end.
 
-%% @doc Lookup session pid.
--spec(lookup_session_pid(emqx_types:client_id()) -> pid() | undefined).
-lookup_session_pid(ClientId) ->
-    emqx_tables:lookup_value(?SESSION_TAB, ClientId).
-
 notify(Event) ->
     gen_server:cast(?SM, {notify, Event}).
 
@@ -203,43 +227,36 @@ init([]) ->
     ok = emqx_tables:new(?SESSION_P_TAB, TabOpts),
     ok = emqx_tables:new(?SESSION_ATTRS_TAB, TabOpts),
     ok = emqx_tables:new(?SESSION_STATS_TAB, TabOpts),
-    ok = emqx_stats:update_interval(sm_stats, fun ?MODULE:stats_fun/0),
+    ok = emqx_stats:update_interval(sess_stats, fun ?MODULE:stats_fun/0),
     {ok, #{sess_pmon => emqx_pmon:new()}}.
 
 handle_call(Req, _From, State) ->
     emqx_logger:error("[SM] unexpected call: ~p", [Req]),
     {reply, ignored, State}.
 
-handle_cast({notify, {registered, ClientId, SPid}}, State = #{sess_pmon := PMon}) ->
-    {noreply, State#{sess_pmon := emqx_pmon:monitor(SPid, ClientId, PMon)}};
+handle_cast({notify, {registered, ClientId, SessPid}}, State = #{sess_pmon := PMon}) ->
+    {noreply, State#{sess_pmon := emqx_pmon:monitor(SessPid, ClientId, PMon)}};
 
-handle_cast({notify, {unregistered, _ClientId, SPid}}, State = #{sess_pmon := PMon}) ->
-    {noreply, State#{sess_pmon := emqx_pmon:demonitor(SPid, PMon)}};
+handle_cast({notify, {unregistered, SessPid}}, State = #{sess_pmon := PMon}) ->
+    {noreply, State#{sess_pmon := emqx_pmon:demonitor(SessPid, PMon)}};
 
 handle_cast(Msg, State) ->
     emqx_logger:error("[SM] unexpected cast: ~p", [Msg]),
     {noreply, State}.
 
-handle_info({'DOWN', _MRef, process, DownPid, _Reason}, State = #{sess_pmon := PMon}) ->
-    case emqx_pmon:find(DownPid, PMon) of
-        undefined ->
-            {noreply, State};
-        ClientId ->
-            Session = {ClientId, DownPid},
-            case ets:member(?SESSION_ATTRS_TAB, Session) of
-                true ->
-                    ok = emqx_pool:async_submit(fun do_unregister_session/1, [Session]);
-                false -> ok
-            end,
-            {noreply, State#{sess_pmon := emqx_pmon:erase(DownPid, PMon)}}
-    end;
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #{sess_pmon := PMon}) ->
+    SessPids = [Pid | emqx_misc:drain_down(?BATCH_SIZE)],
+    {Items, PMon1} = emqx_pmon:erase_all(SessPids, PMon),
+    ok = emqx_pool:async_submit(
+           fun lists:foreach/2, [fun clean_down/1, Items]),
+    {noreply, State#{sess_pmon := PMon1}};
 
 handle_info(Info, State) ->
     emqx_logger:error("[SM] unexpected info: ~p", [Info]),
     {noreply, State}.
 
 terminate(_Reason, _State) ->
-    emqx_stats:cancel_update(sm_stats).
+    emqx_stats:cancel_update(sess_stats).
 
 code_change(_OldVsn, State, _Extra) ->
     {ok, State}.
@@ -248,6 +265,15 @@ code_change(_OldVsn, State, _Extra) ->
 %% Internal functions
 %%------------------------------------------------------------------------------
 
+clean_down({SessPid, ClientId}) ->
+    Session = {ClientId, SessPid},
+    case ets:member(?SESSION_TAB, ClientId)
+         orelse ets:member(?SESSION_ATTRS_TAB, Session) of
+        true ->
+            do_unregister_session(Session);
+        false -> ok
+    end.
+
 stats_fun() ->
     safe_update_stats(?SESSION_TAB, 'sessions/count', 'sessions/max'),
     safe_update_stats(?SESSION_P_TAB, 'sessions/persistent/count', 'sessions/persistent/max').

+ 2 - 3
src/emqx_sm_registry.erl

@@ -43,10 +43,9 @@ start_link() ->
 is_enabled() ->
     emqx_config:get_env(enable_session_registry, true).
 
--spec(lookup_session(emqx_types:client_id())
-      -> list({emqx_types:client_id(), session_pid()})).
+-spec(lookup_session(emqx_types:client_id()) -> list(session_pid())).
 lookup_session(ClientId) ->
-    [{ClientId, SessPid} || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)].
+    [SessPid || #global_session{pid = SessPid} <- mnesia:dirty_read(?TAB, ClientId)].
 
 -spec(register_session({emqx_types:client_id(), session_pid()}) -> ok).
 register_session({ClientId, SessPid}) when is_binary(ClientId), is_pid(SessPid) ->

+ 18 - 11
test/emqx_banned_SUITE.erl

@@ -18,9 +18,7 @@
 -compile(nowarn_export_all).
 
 -include("emqx.hrl").
-
 -include("emqx_mqtt.hrl").
-
 -include_lib("eunit/include/eunit.hrl").
 
 all() -> [t_banned_all].
@@ -29,18 +27,27 @@ t_banned_all(_) ->
     emqx_ct_broker_helpers:run_setup_steps(),
     emqx_banned:start_link(),
     TimeNow = erlang:system_time(second),
-    Banned = #banned{who    = {client_id, <<"TestClient">>},
+    Banned = #banned{who = {client_id, <<"TestClient">>},
                      reason = <<"test">>,
-                     by     = <<"banned suite">>,
-                     desc   = <<"test">>,
-                     until  = TimeNow + 1},
+                     by = <<"banned suite">>,
+                     desc = <<"test">>,
+                     until = TimeNow + 1},
     ok = emqx_banned:add(Banned),
     % here is not expire banned test because its check interval is greater than 5 mins, but its effect has been confirmed
-    ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
+    ?assert(emqx_banned:check(#{client_id => <<"TestClient">>,
+                                username => undefined,
+                                peername => {undefined, undefined}})),
     timer:sleep(2500),
-    ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
+    ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>,
+                                   username => undefined,
+                                   peername => {undefined, undefined}})),
     ok = emqx_banned:add(Banned),
-    ?assert(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
-    emqx_banned:del({client_id, <<"TestClient">>}),
-    ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>, username => undefined, peername => {undefined, undefined}})),
+    ?assert(emqx_banned:check(#{client_id => <<"TestClient">>,
+                                username => undefined,
+                                peername => {undefined, undefined}})),
+    emqx_banned:delete({client_id, <<"TestClient">>}),
+    ?assertNot(emqx_banned:check(#{client_id => <<"TestClient">>,
+                                   username => undefined,
+                                   peername => {undefined, undefined}})),
     emqx_ct_broker_helpers:run_teardown_steps().
+

+ 12 - 10
test/emqx_cm_SUITE.erl

@@ -24,14 +24,16 @@ all() -> [t_register_unregister_connection].
 t_register_unregister_connection(_) ->
     {ok, _} = emqx_cm_sup:start_link(),
     Pid = self(),
-    emqx_cm:register_connection(<<"conn1">>),
-    emqx_cm:register_connection({<<"conn2">>, Pid}, [{port, 8080}, {ip, "192.168.0.1"}]),
+    ok = emqx_cm:register_connection(<<"conn1">>),
+    ok emqx_cm:register_connection(<<"conn2">>, Pid),
+    true = emqx_cm:set_conn_attrs(<<"conn1">>, [{port, 8080}, {ip, "192.168.0.1"}]),
+    true = emqx_cm:set_conn_attrs(<<"conn2">>, Pid, [{port, 8080}, {ip, "192.168.0.1"}]),
     timer:sleep(2000),
-    [{<<"conn1">>, Pid}] = emqx_cm:lookup_connection(<<"conn1">>),
-    [{<<"conn2">>, Pid}] = emqx_cm:lookup_connection(<<"conn2">>),
-    Pid = emqx_cm:lookup_conn_pid(<<"conn1">>),
-    emqx_cm:unregister_connection(<<"conn1">>),
-    [] = emqx_cm:lookup_connection(<<"conn1">>),
-    [{port, 8080}, {ip, "192.168.0.1"}] = emqx_cm:get_conn_attrs({<<"conn2">>, Pid}),
-    emqx_cm:set_conn_stats(<<"conn2">>, [[{count, 1}, {max, 2}]]),
-    [[{count, 1}, {max, 2}]] = emqx_cm:get_conn_stats({<<"conn2">>, Pid}).
+    ?assertEqual(Pid, emqx_cm:lookup_conn_pid(<<"conn1">>)),
+    ?assertEqual(Pid, emqx_cm:lookup_conn_pid(<<"conn2">>)),
+    ok = emqx_cm:unregister_connection(<<"conn1">>),
+    ?assertEqual(undefined, emqx_cm:lookup_conn_pid(<<"conn1">>)),
+    ?assertEqual([{port, 8080}, {ip, "192.168.0.1"}], emqx_cm:get_conn_attrs({<<"conn2">>, Pid})),
+    true = emqx_cm:set_conn_stats(<<"conn2">>, [{count, 1}, {max, 2}]),
+    ?assertEqual([{count, 1}, {max, 2}], emqx_cm:get_conn_stats({<<"conn2">>, Pid})).
+

+ 48 - 0
test/emqx_pmon_SUITE.erl

@@ -0,0 +1,48 @@
+%% Copyright (c) 2018 EMQ Technologies Co., Ltd. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%%     http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+
+-module(emqx_pmon_SUITE).
+
+-compile(export_all).
+-compile(nowarn_export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+    [t_monitor, t_find, t_erase].
+
+t_monitor(_) ->
+    PMon = emqx_pmon:new(),
+    PMon1 = emqx_pmon:monitor(self(), PMon),
+    ?assertEqual(1, emqx_pmon:count(PMon1)),
+    PMon2 = emqx_pmon:demonitor(self(), PMon1),
+    ?assertEqual(0, emqx_pmon:count(PMon2)).
+
+t_find(_) ->
+    PMon = emqx_pmon:new(),
+    PMon1 = emqx_pmon:monitor(self(), val, PMon),
+    ?assertEqual(1, emqx_pmon:count(PMon1)),
+    ?assertEqual({ok, val}, emqx_pmon:find(self(), PMon1)),
+    PMon2 = emqx_pmon:erase(self(), PMon1),
+    ?assertEqual(error, emqx_pmon:find(self(), PMon2)).
+
+t_erase(_) ->
+    PMon = emqx_pmon:new(),
+    PMon1 = emqx_pmon:monitor(self(), val, PMon),
+    PMon2 = emqx_pmon:erase(self(), PMon1),
+    ?assertEqual(0, emqx_pmon:count(PMon2)),
+    {Items, PMon3} = emqx_pmon:erase_all([self()], PMon1),
+    ?assertEqual([{self(), val}], Items),
+    ?assertEqual(0, emqx_pmon:count(PMon3)).
+

+ 1 - 1
test/emqx_sequence_SUITE.erl

@@ -34,5 +34,5 @@ sequence_generate(_) ->
     ?assertEqual(0, reclaim(seqtab, key)),
     ?assertEqual(false, ets:member(seqtab, key)),
     ?assertEqual(1, nextval(seqtab, key)),
-    ?assert(emqx_sequence:delete(seqtab).
+    ?assert(emqx_sequence:delete(seqtab)).
 

+ 7 - 9
test/emqx_sm_SUITE.erl

@@ -34,16 +34,14 @@ t_open_close_session(_) ->
               topic_alias_maximum => 0,
               will_msg            => undefined},
     {ok, SPid} = emqx_sm:open_session(Attrs),
-    [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
-    SPid = emqx_sm:lookup_session_pid(<<"client">>),
+    ?assertEqual([SPid], emqx_sm:lookup_session_pids(<<"client">>)),
     {ok, NewConnPid} = emqx_mock_client:start_link(<<"client">>),
     {ok, SPid, true} = emqx_sm:open_session(Attrs#{clean_start => false, conn_pid => NewConnPid}),
-    [{<<"client">>, SPid}] = emqx_sm:lookup_session(<<"client">>),
-    SAttrs = emqx_sm:get_session_attrs({<<"client">>, SPid}),
-    <<"client">> = proplists:get_value(client_id, SAttrs),
-    Session = {<<"client">>, SPid},
-    emqx_sm:set_session_stats(Session, {open, true}),
-    {open, true} = emqx_sm:get_session_stats(Session),
+    ?assertEqual([SPid], emqx_sm:lookup_session_pids(<<"client">>)),
+    SAttrs = emqx_sm:get_session_attrs(<<"client">>, SPid),
+    ?assertEqual(<<"client">>, proplists:get_value(client_id, SAttrs)),
+    emqx_sm:set_session_stats(<<"client">>, SPid, [{inflight, 10}]),
+    ?assertEqual([{inflight, 10}], emqx_sm:get_session_stats(<<"client">>, SPid)),
     ok = emqx_sm:close_session(SPid),
-    [] = emqx_sm:lookup_session(<<"client">>),
+    ?assertEqual([], emqx_sm:lookup_session_pids(<<"client">>)),
     emqx_ct_broker_helpers:run_teardown_steps().